http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index 2da5771,e4bc193..c4ed7a0 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@@ -34,8 -42,7 +34,10 @@@ import org.apache.phoenix.hbase.index.c import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.hbase.index.scanner.ScannerBuilder; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; + ++import com.google.inject.Key; + /** * Manage the state of the HRegion's view of the table, for the single row. * <p> @@@ -46,230 -55,190 +48,230 @@@ */ public class LocalTableState implements TableState { - private long ts; - private RegionCoprocessorEnvironment env; - private KeyValueStore memstore; - private LocalHBaseState table; - private Mutation update; - private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>(); - private ScannerBuilder scannerBuilder; - private List<KeyValue> kvs = new ArrayList<KeyValue>(); - private List<? extends IndexedColumnGroup> hints; - private CoveredColumns columnSet; - - public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) { - this.env = environment; - this.table = table; - this.update = update; - this.memstore = new IndexMemStore(); - this.scannerBuilder = new ScannerBuilder(memstore, update); - this.columnSet = new CoveredColumns(); - } - - public void addPendingUpdates(KeyValue... kvs) { - if (kvs == null) return; - addPendingUpdates(Arrays.asList(kvs)); - } - - public void addPendingUpdates(List<KeyValue> kvs) { - if(kvs == null) return; - setPendingUpdates(kvs); - addUpdate(kvs); - } - - private void addUpdate(List<KeyValue> list) { - addUpdate(list, true); - } - - private void addUpdate(List<KeyValue> list, boolean overwrite) { - if (list == null) return; - for (KeyValue kv : list) { - this.memstore.add(kv, overwrite); - } - } - - @Override - public RegionCoprocessorEnvironment getEnvironment() { - return this.env; - } - - @Override - public long getCurrentTimestamp() { - return this.ts; - } - - @Override - public void setCurrentTimestamp(long timestamp) { - this.ts = timestamp; - } - - public void resetTrackedColumns() { - this.trackedColumns.clear(); - } - - public Set<ColumnTracker> getTrackedColumns() { - return this.trackedColumns; - } - - @Override - public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState( - Collection<? extends ColumnReference> indexedColumns) throws IOException { - ensureLocalStateInitialized(indexedColumns); - // filter out things with a newer timestamp and track the column references to which it applies - ColumnTracker tracker = new ColumnTracker(indexedColumns); - synchronized (this.trackedColumns) { - // we haven't seen this set of columns before, so we need to create a new tracker - if (!this.trackedColumns.contains(tracker)) { - this.trackedColumns.add(tracker); - } - } - - Scanner scanner = - this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts); - - return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker)); - } - - /** - * Initialize the managed local state. Generally, this will only be called by - * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. - * Even then, there is still fairly low contention as each new Put/Delete will have its own table - * state. - */ - private synchronized void ensureLocalStateInitialized( - Collection<? extends ColumnReference> columns) throws IOException { - // check to see if we haven't initialized any columns yet - Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns); - // we have all the columns loaded, so we are good to go. - if (toCover.isEmpty()) { - return; - } - - // add the current state of the row - this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false); - - // add the covered columns to the set - for (ColumnReference ref : toCover) { - this.columnSet.addColumn(ref); - } - } - - @Override - public Map<String, byte[]> getUpdateAttributes() { - return this.update.getAttributesMap(); - } - - @Override - public byte[] getCurrentRowKey() { - return this.update.getRow(); - } - - public Result getCurrentRowState() { - KeyValueScanner scanner = this.memstore.getScanner(); - List<Cell> kvs = new ArrayList<Cell>(); - while (scanner.peek() != null) { - try { - kvs.add(scanner.next()); - } catch (IOException e) { - // this should never happen - something has gone terribly arwy if it has - throw new RuntimeException("Local MemStore threw IOException!"); - } - } - return Result.create(kvs); - } - - /** - * Helper to add a {@link Mutation} to the values stored for the current row - * @param pendingUpdate update to apply - */ - public void addUpdateForTesting(Mutation pendingUpdate) { - for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) { - List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue()); - addUpdate(edits); - } - } - - /** - * @param hints - */ - public void setHints(List<? extends IndexedColumnGroup> hints) { - this.hints = hints; - } - - @Override - public List<? extends IndexedColumnGroup> getIndexColumnHints() { - return this.hints; - } - - @Override - public Collection<KeyValue> getPendingUpdate() { - return this.kvs; - } - - /** - * Set the {@link KeyValue}s in the update for which we are currently building an index update, - * but don't actually apply them. - * @param update pending {@link KeyValue}s - */ - public void setPendingUpdates(Collection<KeyValue> update) { - this.kvs.clear(); - this.kvs.addAll(update); - } - - /** - * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}. - */ - public void applyPendingUpdates() { - this.addUpdate(kvs); - } - - /** - * Rollback all the given values from the underlying state. - * @param values - */ - public void rollback(Collection<KeyValue> values) { - for (KeyValue kv : values) { - this.memstore.rollback(kv); - } - } + private long ts; + private RegionCoprocessorEnvironment env; + private KeyValueStore memstore; + private LocalHBaseState table; + private Mutation update; + private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>(); + private ScannerBuilder scannerBuilder; - private List<Cell> kvs = new ArrayList<Cell>(); ++ private List<KeyValue> kvs = new ArrayList<KeyValue>(); + private List<? extends IndexedColumnGroup> hints; + private CoveredColumns columnSet; + + public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) { + this.env = environment; + this.table = table; + this.update = update; + this.memstore = new IndexMemStore(); + this.scannerBuilder = new ScannerBuilder(memstore, update); + this.columnSet = new CoveredColumns(); + } + - public void addPendingUpdates(Cell... kvs) { ++ public void addPendingUpdates(KeyValue... kvs) { + if (kvs == null) return; + addPendingUpdates(Arrays.asList(kvs)); + } + - public void addPendingUpdates(List<Cell> kvs) { ++ public void addPendingUpdates(List<KeyValue> kvs) { + if (kvs == null) return; + setPendingUpdates(kvs); + addUpdate(kvs); + } + - private void addUpdate(List<Cell> list) { ++ private void addUpdate(List<KeyValue> list) { + addUpdate(list, true); + } + - private void addUpdate(List<Cell> list, boolean overwrite) { ++ private void addUpdate(List<KeyValue> list, boolean overwrite) { + if (list == null) return; - for (Cell kv : list) { - this.memstore.add(KeyValueUtil.ensureKeyValue(kv), overwrite); ++ for (KeyValue kv : list) { ++ this.memstore.add(kv, overwrite); + } + } + + @Override + public RegionCoprocessorEnvironment getEnvironment() { + return this.env; + } + + @Override + public long getCurrentTimestamp() { + return this.ts; + } + + /** + * Set the current timestamp up to which the table should allow access to the underlying table. + * This overrides the timestamp view provided by the indexer - use with care! + * @param timestamp timestamp up to which the table should allow access. + */ + public void setCurrentTimestamp(long timestamp) { + this.ts = timestamp; + } + + public void resetTrackedColumns() { + this.trackedColumns.clear(); + } + + public Set<ColumnTracker> getTrackedColumns() { + return this.trackedColumns; + } + + /** + * Get a scanner on the columns that are needed by the index. + * <p> + * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given + * columns with a timestamp earlier than the timestamp to which the table is currently set (the + * current state of the table for which we need to build an update). + * <p> + * If none of the passed columns matches any of the columns in the pending update (as determined + * by {@link ColumnReference#matchesFamily(byte[])} and + * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This + * is because it doesn't make sense to build index updates when there is no change in the table + * state for any of the columns you are indexing. + * <p> + * <i>NOTE:</i> This method should <b>not</b> be used during + * {@link IndexCodec#getIndexDeletes(TableState, BatchState)} as the pending update will not yet have been + * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i> + * need to track the indexed columns. + * <p> + * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you + * request - you will never see a column with the timestamp we are tracking, but the next oldest + * timestamp for that column. + * @param indexedColumns the columns to that will be indexed + * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to + * the builder. Even if no update is necessary for the requested columns, you still need + * to return the {@link IndexUpdate}, just don't set the update for the + * {@link IndexUpdate}. + * @throws IOException + */ + public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState( + Collection<? extends ColumnReference> indexedColumns) throws IOException { + ensureLocalStateInitialized(indexedColumns); + // filter out things with a newer timestamp and track the column references to which it applies + ColumnTracker tracker = new ColumnTracker(indexedColumns); + synchronized (this.trackedColumns) { + // we haven't seen this set of columns before, so we need to create a new tracker + if (!this.trackedColumns.contains(tracker)) { + this.trackedColumns.add(tracker); + } + } + + Scanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts); + + return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker)); + } + + /** + * Initialize the managed local state. Generally, this will only be called by + * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even + * then, there is still fairly low contention as each new Put/Delete will have its own table state. + */ + private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns) + throws IOException { + // check to see if we haven't initialized any columns yet + Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns); + // we have all the columns loaded, so we are good to go. + if (toCover.isEmpty()) { return; } + + // add the current state of the row - this.addUpdate(this.table.getCurrentRowState(update, toCover).listCells(), false); ++ this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false); + + // add the covered columns to the set + for (ColumnReference ref : toCover) { + this.columnSet.addColumn(ref); + } + } + + @Override + public Map<String, byte[]> getUpdateAttributes() { + return this.update.getAttributesMap(); + } + + @Override + public byte[] getCurrentRowKey() { + return this.update.getRow(); + } + + public Result getCurrentRowState() { + KeyValueScanner scanner = this.memstore.getScanner(); + List<Cell> kvs = new ArrayList<Cell>(); + while (scanner.peek() != null) { + try { + kvs.add(scanner.next()); + } catch (IOException e) { + // this should never happen - something has gone terribly arwy if it has + throw new RuntimeException("Local MemStore threw IOException!"); + } + } + return Result.create(kvs); + } + + /** + * Helper to add a {@link Mutation} to the values stored for the current row + * + * @param pendingUpdate + * update to apply + */ + public void addUpdateForTesting(Mutation pendingUpdate) { + for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) { - List<Cell> edits = e.getValue(); ++ List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue()); + addUpdate(edits); + } + } + + /** + * @param hints + */ + public void setHints(List<? extends IndexedColumnGroup> hints) { + this.hints = hints; + } + + @Override + public List<? extends IndexedColumnGroup> getIndexColumnHints() { + return this.hints; + } + + @Override - public Collection<Cell> getPendingUpdate() { ++ public Collection<KeyValue> getPendingUpdate() { + return this.kvs; + } + + /** + * Set the {@link KeyValue}s in the update for which we are currently building an index update, but don't actually + * apply them. + * + * @param update + * pending {@link KeyValue}s + */ - public void setPendingUpdates(Collection<Cell> update) { ++ public void setPendingUpdates(Collection<KeyValue> update) { + this.kvs.clear(); + this.kvs.addAll(update); + } + + /** + * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}. + */ + public void applyPendingUpdates() { + this.addUpdate(kvs); + } + + /** + * Rollback all the given values from the underlying state. + * + * @param values + */ - public void rollback(Collection<Cell> values) { - for (Cell kv : values) { - this.memstore.rollback(KeyValueUtil.ensureKeyValue(kv)); ++ public void rollback(Collection<KeyValue> values) { ++ for (KeyValue kv : values) { ++ this.memstore.rollback(kv); + } + } + + @Override + public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns) + throws IOException { + Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns); + ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey()); + return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond()); + } }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java index 11e7d1a,0000000..27af40f mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@@ -1,404 -1,0 +1,404 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder; +import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; +import org.apache.phoenix.hbase.index.covered.data.LocalTable; +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; +import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; +import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; + +/** + * Build covered indexes for phoenix updates. + * <p> + * Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't need to do any extra + * synchronization in the IndexBuilder. + * <p> + * NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or flush, leading to a + * bloated index that needs to be cleaned up by a background process. + */ +public class NonTxIndexBuilder extends BaseIndexBuilder { + private static final Log LOG = LogFactory.getLog(NonTxIndexBuilder.class); + + protected LocalHBaseState localTable; + + @Override + public void setup(RegionCoprocessorEnvironment env) throws IOException { + super.setup(env); + this.localTable = new LocalTable(env); + } + + @Override + public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException { + // create a state manager, so we can manage each batch + LocalTableState state = new LocalTableState(env, localTable, mutation); + // build the index updates for each group + IndexUpdateManager manager = new IndexUpdateManager(); + + batchMutationAndAddUpdates(manager, state, mutation, indexMetaData); + + if (LOG.isDebugEnabled()) { + LOG.debug("Found index updates for Mutation: " + mutation + "\n" + manager); + } + + return manager.toMap(); + } + + /** + * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each key-value in the + * update to see if it matches the others. Generally, this will be the case, but you can add kvs to a mutation that + * don't all have the timestamp, so we need to manage everything in batches based on timestamp. + * <p> + * Adds all the updates in the {@link Mutation} to the state, as a side-effect. + * @param state + * current state of the row for the mutation. + * @param m + * mutation to batch + * @param indexMetaData TODO + * @param updateMap + * index updates into which to add new updates. Modified as a side-effect. + * + * @throws IOException + */ + private void batchMutationAndAddUpdates(IndexUpdateManager manager, LocalTableState state, Mutation m, IndexMetaData indexMetaData) throws IOException { + // split the mutation into timestamp-based batches + Collection<Batch> batches = createTimestampBatchesFromMutation(m); + + // go through each batch of keyvalues and build separate index entries for each + boolean cleanupCurrentState = true; + for (Batch batch : batches) { + /* + * We have to split the work between the cleanup and the update for each group because when we update the + * current state of the row for the current batch (appending the mutations for the current batch) the next + * group will see that as the current state, which will can cause the a delete and a put to be created for + * the next group. + */ + if (addMutationsForBatch(manager, batch, state, cleanupCurrentState, indexMetaData)) { + cleanupCurrentState = false; + } + } + } + + /** + * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any {@link KeyValue} with a timestamp + * == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called. + * + * @param m + * {@link Mutation} from which to extract the {@link KeyValue}s + * @return the mutation, broken into batches and sorted in ascending order (smallest first) + */ + protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) { + Map<Long, Batch> batches = new HashMap<Long, Batch>(); + for (List<Cell> family : m.getFamilyCellMap().values()) { + List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family); + createTimestampBatchesFromKeyValues(familyKVs, batches); + } + // sort the batches + List<Batch> sorted = new ArrayList<Batch>(batches.values()); + Collections.sort(sorted, new Comparator<Batch>() { + @Override + public int compare(Batch o1, Batch o2) { + return Longs.compare(o1.getTimestamp(), o2.getTimestamp()); + } + }); + return sorted; + } + + /** + * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any {@link KeyValue} with a + * timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called. + * + * @param kvs + * {@link KeyValue}s to break into batches + * @param batches + * to update with the given kvs + */ + protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs, Map<Long, Batch> batches) { - long now = EnvironmentEdgeManager.currentTimeMillis(); ++ long now = EnvironmentEdgeManager.currentTime(); + byte[] nowBytes = Bytes.toBytes(now); + + // batch kvs by timestamp + for (KeyValue kv : kvs) { + long ts = kv.getTimestamp(); + // override the timestamp to the current time, so the index and primary tables match + // all the keys with LATEST_TIMESTAMP will then be put into the same batch + if (kv.updateLatestStamp(nowBytes)) { + ts = now; + } + Batch batch = batches.get(ts); + if (batch == null) { + batch = new Batch(ts); + batches.put(ts, batch); + } + batch.add(kv); + } + } + + /** + * For a single batch, get all the index updates and add them to the updateMap + * <p> + * This method manages cleaning up the entire history of the row from the given timestamp forward for out-of-order + * (e.g. 'back in time') updates. + * <p> + * If things arrive out of order (client is using custom timestamps) we should still see the index in the correct + * order (assuming we scan after the out-of-order update in finished). Therefore, we when we aren't the most recent + * update to the index, we need to delete the state at the current timestamp (similar to above), but also issue a + * delete for the added index updates at the next newest timestamp of any of the columns in the update; we need to + * cleanup the insert so it looks like it was also deleted at that next newest timestamp. However, its not enough to + * just update the one in front of us - that column will likely be applied to index entries up the entire history in + * front of us, which also needs to be fixed up. + * <p> + * However, the current update usually will be the most recent thing to be added. In that case, all we need to is + * issue a delete for the previous index row (the state of the row, without the update applied) at the current + * timestamp. This gets rid of anything currently in the index for the current state of the row (at the timestamp). + * Then we can just follow that by applying the pending update and building the index update based on the new row + * state. + * + * @param updateMap + * map to update with new index elements + * @param batch + * timestamp-based batch of edits + * @param state + * local state to update and pass to the codec + * @param requireCurrentStateCleanup + * <tt>true</tt> if we should should attempt to cleanup the current state of the table, in the event of a + * 'back in time' batch. <tt>false</tt> indicates we should not attempt the cleanup, e.g. an earlier + * batch already did the cleanup. + * @param indexMetaData TODO + * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put), <tt>false</tt> + * otherwise + * @throws IOException + */ + private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch, LocalTableState state, + boolean requireCurrentStateCleanup, IndexMetaData indexMetaData) throws IOException { + + // need a temporary manager for the current batch. It should resolve any conflicts for the + // current batch. Essentially, we can get the case where a batch doesn't change the current + // state of the index (all Puts are covered by deletes), in which case we don't want to add + // anything + // A. Get the correct values for the pending state in the batch + // A.1 start by cleaning up the current state - as long as there are key-values in the batch + // that are indexed, we need to change the current state of the index. Its up to the codec to + // determine if we need to make any cleanup given the pending update. + long batchTs = batch.getTimestamp(); + state.setPendingUpdates(batch.getKvs()); + addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData); + + // A.2 do a single pass first for the updates to the current state + state.applyPendingUpdates(); + long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData); + // if all the updates are the latest thing in the index, we are done - don't go and fix history + if (ColumnTracker.isNewestTime(minTs)) { return false; } + + // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the + // index. after this, we have the correct view of the index, from the batch up to the index + while (!ColumnTracker.isNewestTime(minTs)) { + minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, indexMetaData); + } + + // B. only cleanup the current state if we need to - its a huge waste of effort otherwise. + if (requireCurrentStateCleanup) { + // roll back the pending update. This is needed so we can remove all the 'old' index entries. + // We don't need to do the puts here, but just the deletes at the given timestamps since we + // just want to completely hide the incorrect entries. + state.rollback(batch.getKvs()); + // setup state + state.setPendingUpdates(batch.getKvs()); + + // cleanup the pending batch. If anything in the correct history is covered by Deletes used to + // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both + // because the update may have a different set of columns or value based on the update). + cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, indexMetaData); + + // have to roll the state forward again, so the current state is correct + state.applyPendingUpdates(); + return true; + } + return false; + } + + private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap, IndexMetaData indexMetaData) + throws IOException { + state.setCurrentTimestamp(ts); + ts = addCurrentStateMutationsForBatch(updateMap, state, indexMetaData); + return ts; + } + + private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData) + throws IOException { + // get the cleanup for the current state + state.setCurrentTimestamp(batchTs); + addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData); + // ignore any index tracking from the delete + state.resetTrackedColumns(); + } + + /** + * Add the necessary mutations for the pending batch on the local state. Handles rolling up through history to + * determine the index changes after applying the batch (for the case where the batch is back in time). + * + * @param updateMap + * to update with index mutations + * @param state + * current state of the table + * @param indexMetaData TODO + * @param batch + * to apply to the current state + * @return the minimum timestamp across all index columns requested. If {@link ColumnTracker#isNewestTime(long)} + * returns <tt>true</tt> on the returned timestamp, we know that this <i>was not a back-in-time update</i>. + * @throws IOException + */ + private long addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state, IndexMetaData indexMetaData) + throws IOException { + + // get the index updates for this current batch + Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, indexMetaData); + state.resetTrackedColumns(); + + /* + * go through all the pending updates. If we are sure that all the entries are the latest timestamp, we can just + * add the index updates and move on. However, if there are columns that we skip past (based on the timestamp of + * the batch), we need to roll back up the history. Regardless of whether or not they are the latest timestamp, + * the entries here are going to be correct for the current batch timestamp, so we add them to the updates. The + * only thing we really care about it if we need to roll up the history and fix it as we go. + */ + // timestamp of the next update we need to track + long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP; + List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>(); + for (IndexUpdate update : upserts) { + // this is the one bit where we check the timestamps + final ColumnTracker tracker = update.getIndexedColumns(); + long trackerTs = tracker.getTS(); + // update the next min TS we need to track + if (trackerTs < minTs) { + minTs = tracker.getTS(); + } + // track index hints for the next round. Hint if we need an update for that column for the + // next timestamp. These columns clearly won't need to update as we go through time as they + // already match the most recent possible thing. + boolean needsCleanup = false; + if (tracker.hasNewerTimestamps()) { + columnHints.add(tracker); + // this update also needs to be cleaned up at the next timestamp because it not the latest. + needsCleanup = true; + } + + // only make the put if the index update has been setup + if (update.isValid()) { + byte[] table = update.getTableName(); + Mutation mutation = update.getUpdate(); + updateMap.addIndexUpdate(table, mutation); + + // only make the cleanup if we made a put and need cleanup + if (needsCleanup) { + // there is a TS for the interested columns that is greater than the columns in the + // put. Therefore, we need to issue a delete at the same timestamp + Delete d = new Delete(mutation.getRow()); + d.setTimestamp(tracker.getTS()); + updateMap.addIndexUpdate(table, d); + } + } + } + return minTs; + } + + /** + * Cleanup the index based on the current state from the given batch. Iterates over each timestamp (for the indexed + * rows) for the current state of the table and cleans up all the existing entries generated by the codec. + * <p> + * Adds all pending updates to the updateMap + * + * @param updateMap + * updated with the pending index updates from the codec + * @param batchTs + * timestamp from which we should cleanup + * @param state + * current state of the primary table. Should already by setup to the correct state from which we want to + * cleanup. + * @param indexMetaData TODO + * @throws IOException + */ + private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData) + throws IOException { + // get the cleanup for the current state + state.setCurrentTimestamp(batchTs); + addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData); + Set<ColumnTracker> trackers = state.getTrackedColumns(); + long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP; + for (ColumnTracker tracker : trackers) { + if (tracker.getTS() < minTs) { + minTs = tracker.getTS(); + } + } + state.resetTrackedColumns(); + if (!ColumnTracker.isNewestTime(minTs)) { + state.setHints(Lists.newArrayList(trackers)); + cleanupIndexStateFromBatchOnward(updateMap, minTs, state, indexMetaData); + } + } + + /** + * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the + * update map. + * <p> + * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates applied, etc). + * @param indexMetaData TODO + * + * @throws IOException + */ + protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts, IndexMetaData indexMetaData) + throws IOException { + Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData); + if (cleanup != null) { + for (IndexUpdate d : cleanup) { + if (!d.isValid()) { + continue; + } + // override the timestamps in the delete to match the current batch. + Delete remove = (Delete)d.getUpdate(); + remove.setTimestamp(ts); + updateMap.addIndexUpdate(d.getTableName(), remove); + } + } + } + + @Override + public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData indexMetaData) + throws IOException { + // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows + return null; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java index d8b215c,4c4d0b0..0e961db --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java @@@ -23,7 -23,7 +23,8 @@@ import java.util.Collection import java.util.List; import java.util.Map; +import org.apache.hadoop.hbase.Cell; + import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Pair; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java index 59bc8de,4b953e6..4efca9f --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java @@@ -38,327 -44,325 +38,327 @@@ import com.google.common.collect.Lists */ public class CoveredColumnIndexCodec extends BaseIndexCodec { - private static final byte[] EMPTY_BYTES = new byte[0]; - public static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("INDEXED_COLUMNS"); + private static final byte[] EMPTY_BYTES = new byte[0]; + public static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("INDEXED_COLUMNS"); - private List<ColumnGroup> groups; + private List<ColumnGroup> groups; - /** - * @param groups to initialize the codec with - * @return an instance that is initialized with the given {@link ColumnGroup}s, for testing - * purposes - */ - public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> groups) { - CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec(); - codec.groups = Lists.newArrayList(groups); - return codec; - } - - @Override - public void initialize(RegionCoprocessorEnvironment env) { - groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration()); - } - - @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state) { - List<IndexUpdate> updates = new ArrayList<IndexUpdate>(); - for (ColumnGroup group : groups) { - IndexUpdate update = getIndexUpdateForGroup(group, state); - updates.add(update); + /** + * @param groups + * to initialize the codec with + * @return an instance that is initialized with the given {@link ColumnGroup}s, for testing purposes + */ + public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> groups) { + CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec(); + codec.groups = Lists.newArrayList(groups); + return codec; } - return updates; - } - /** - * @param group - * @param state - * @return the update that should be made to the table - */ - private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) { - List<CoveredColumn> refs = group.getColumns(); - try { - Pair<Scanner, IndexUpdate> stateInfo = state.getIndexedColumnsTableState(refs); - Scanner kvs = stateInfo.getFirst(); - Pair<Integer, List<ColumnEntry>> columns = - getNextEntries(refs, kvs, state.getCurrentRowKey()); - // make sure we close the scanner - kvs.close(); - if (columns.getFirst().intValue() == 0) { - return stateInfo.getSecond(); - } - // have all the column entries, so just turn it into a Delete for the row - // convert the entries to the needed values - byte[] rowKey = - composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); - Put p = new Put(rowKey, state.getCurrentTimestamp()); - // add the columns to the put - addColumnsToPut(p, columns.getSecond()); - - // update the index info - IndexUpdate update = stateInfo.getSecond(); - update.setTable(Bytes.toBytes(group.getTable())); - update.setUpdate(p); - return update; - } catch (IOException e) { - throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); + @Override + public void initialize(RegionCoprocessorEnvironment env) { + groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration()); } - } - private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> columns) { - // add each of the corresponding families to the put - int count = 0; - for (ColumnEntry column : columns) { - indexInsert.add(INDEX_ROW_COLUMN_FAMILY, - ArrayUtils.addAll(Bytes.toBytes(count++), toIndexQualifier(column.ref)), null); - } + @Override + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) { + List<IndexUpdate> updates = new ArrayList<IndexUpdate>(); + for (ColumnGroup group : groups) { + IndexUpdate update = getIndexUpdateForGroup(group, state); + updates.add(update); + } + return updates; - } + } - private static byte[] toIndexQualifier(CoveredColumn column) { - return ArrayUtils.addAll(Bytes.toBytes(column.familyString + CoveredColumn.SEPARATOR), - column.getQualifier()); - } + /** + * @param group + * @param state + * @return the update that should be made to the table + */ + private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) { + List<CoveredColumn> refs = group.getColumns(); + try { + Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs); + Scanner kvs = stateInfo.getFirst(); + Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs, state.getCurrentRowKey()); + // make sure we close the scanner + kvs.close(); + if (columns.getFirst().intValue() == 0) { return stateInfo.getSecond(); } + // have all the column entries, so just turn it into a Delete for the row + // convert the entries to the needed values + byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); + Put p = new Put(rowKey, state.getCurrentTimestamp()); + // add the columns to the put + addColumnsToPut(p, columns.getSecond()); + + // update the index info + IndexUpdate update = stateInfo.getSecond(); + update.setTable(Bytes.toBytes(group.getTable())); + update.setUpdate(p); + return update; + } catch (IOException e) { + throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); + } + } - @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state) { - List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(); - for (ColumnGroup group : groups) { - deletes.add(getDeleteForGroup(group, state)); + private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> columns) { + // add each of the corresponding families to the put + int count = 0; + for (ColumnEntry column : columns) { + indexInsert.add(INDEX_ROW_COLUMN_FAMILY, + ArrayUtils.addAll(Bytes.toBytes(count++), toIndexQualifier(column.ref)), null); + } } - return deletes; - } + private static byte[] toIndexQualifier(CoveredColumn column) { + return ArrayUtils.addAll(Bytes.toBytes(column.familyString + CoveredColumn.SEPARATOR), column.getQualifier()); + } - /** - * Get all the deletes necessary for a group of columns - logically, the cleanup the index table - * for a given index. - * @param group index information - * @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary - */ - private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) { - List<CoveredColumn> refs = group.getColumns(); - try { - Pair<Scanner, IndexUpdate> kvs = state.getIndexedColumnsTableState(refs); - Pair<Integer, List<ColumnEntry>> columns = - getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey()); - // make sure we close the scanner reference - kvs.getFirst().close(); - // no change, just return the passed update - if (columns.getFirst() == 0) { - return kvs.getSecond(); - } - // have all the column entries, so just turn it into a Delete for the row - // convert the entries to the needed values - byte[] rowKey = - composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); - Delete d = new Delete(rowKey); - d.setTimestamp(state.getCurrentTimestamp()); - IndexUpdate update = kvs.getSecond(); - update.setUpdate(d); - update.setTable(Bytes.toBytes(group.getTable())); - return update; - } catch (IOException e) { - throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); + @Override + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { + List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(); + for (ColumnGroup group : groups) { + deletes.add(getDeleteForGroup(group, state)); + } + return deletes; } - } - /** - * Get the next batch of primary table values for the given columns - * @param refs columns to match against - * @param kvs - * @param currentRow - * @return the total length of all values found and the entries to add for the index - */ - private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> refs, Scanner kvs, - byte[] currentRow) throws IOException { - int totalValueLength = 0; - List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size()); - - // pull out the latest state for each column reference, in order - for (CoveredColumn ref : refs) { - KeyValue first = ref.getFirstKeyValueForRow(currentRow); - if (!kvs.seek(first)) { - // no more keys, so add a null value - entries.add(new ColumnEntry(null, ref)); - continue; - } - // there is a next value - we only care about the current value, so we can just snag that - Cell next = kvs.next(); - if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) { - byte[] v = next.getValue(); - totalValueLength += v.length; - entries.add(new ColumnEntry(v, ref)); - } else { - // this first one didn't match at all, so we have to put in a null entry - entries.add(new ColumnEntry(null, ref)); - continue; - } - // here's where is gets a little tricky - we either need to decide if we should continue - // adding entries (matches all qualifiers) or if we are done (matches a single qualifier) - if (!ref.allColumns()) { - continue; - } - // matches all columns, so we need to iterate until we hit the next column with the same - // family as the current key - byte[] lastQual = next.getQualifier(); - byte[] nextQual = null; - while ((next = kvs.next()) != null) { - // different family, done with this column - if (!ref.matchesFamily(next.getFamily())) { - break; + /** + * Get all the deletes necessary for a group of columns - logically, the cleanup the index table for a given index. + * + * @param group + * index information + * @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary + */ + private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) { + List<CoveredColumn> refs = group.getColumns(); + try { + Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs); + Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey()); + // make sure we close the scanner reference + kvs.getFirst().close(); + // no change, just return the passed update + if (columns.getFirst() == 0) { return kvs.getSecond(); } + // have all the column entries, so just turn it into a Delete for the row + // convert the entries to the needed values + byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); + Delete d = new Delete(rowKey); + d.setTimestamp(state.getCurrentTimestamp()); + IndexUpdate update = kvs.getSecond(); + update.setUpdate(d); + update.setTable(Bytes.toBytes(group.getTable())); + return update; + } catch (IOException e) { + throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); } - nextQual = next.getQualifier(); - // we are still on the same qualifier - skip it, since we already added a column for it - if (Arrays.equals(lastQual, nextQual)) { - continue; + } + + /** + * Get the next batch of primary table values for the given columns + * + * @param refs + * columns to match against + * @param state + * @return the total length of all values found and the entries to add for the index + */ + private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> refs, Scanner kvs, byte[] currentRow) + throws IOException { + int totalValueLength = 0; + List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size()); + + // pull out the latest state for each column reference, in order + for (CoveredColumn ref : refs) { + KeyValue first = ref.getFirstKeyValueForRow(currentRow); + if (!kvs.seek(first)) { + // no more keys, so add a null value + entries.add(new ColumnEntry(null, ref)); + continue; + } + // there is a next value - we only care about the current value, so we can just snag that + Cell next = kvs.next(); + if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) { + byte[] v = next.getValue(); + totalValueLength += v.length; + entries.add(new ColumnEntry(v, ref)); + } else { + // this first one didn't match at all, so we have to put in a null entry + entries.add(new ColumnEntry(null, ref)); + continue; + } + // here's where is gets a little tricky - we either need to decide if we should continue + // adding entries (matches all qualifiers) or if we are done (matches a single qualifier) + if (!ref.allColumns()) { + continue; + } + // matches all columns, so we need to iterate until we hit the next column with the same + // family as the current key + byte[] lastQual = next.getQualifier(); + byte[] nextQual = null; + while ((next = kvs.next()) != null) { + // different family, done with this column + if (!ref.matchesFamily(next.getFamily())) { + break; + } + nextQual = next.getQualifier(); + // we are still on the same qualifier - skip it, since we already added a column for it + if (Arrays.equals(lastQual, nextQual)) { + continue; + } + // this must match the qualifier since its an all-qualifiers specifier, so we add it + byte[] v = next.getValue(); + totalValueLength += v.length; + entries.add(new ColumnEntry(v, ref)); + // update the last qualifier to check against + lastQual = nextQual; + } } - // this must match the qualifier since its an all-qualifiers specifier, so we add it - byte[] v = next.getValue(); - totalValueLength += v.length; - entries.add(new ColumnEntry(v, ref)); - // update the last qualifier to check against - lastQual = nextQual; - } + return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries); } - return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries); - } - static class ColumnEntry { - byte[] value = EMPTY_BYTES; - CoveredColumn ref; + static class ColumnEntry { + byte[] value = EMPTY_BYTES; + CoveredColumn ref; - public ColumnEntry(byte[] value, CoveredColumn ref) { - this.value = value == null ? EMPTY_BYTES : value; - this.ref = ref; + public ColumnEntry(byte[] value, CoveredColumn ref) { + this.value = value == null ? EMPTY_BYTES : value; + this.ref = ref; + } } - } - /** - * Compose the final index row key. - * <p> - * This is faster than adding each value independently as we can just build a single a array and - * copy everything over once. - * @param pk primary key of the original row - * @param length total number of bytes of all the values that should be added - * @param values to use when building the key - */ - static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) { - // now build up expected row key, each of the values, in order, followed by the PK and then some - // info about lengths so we can deserialize each value - byte[] output = new byte[length + pk.length]; - int pos = 0; - int[] lengths = new int[values.size()]; - int i = 0; - for (ColumnEntry entry : values) { - byte[] v = entry.value; - // skip doing the copy attempt, if we don't need to - if (v.length != 0) { - System.arraycopy(v, 0, output, pos, v.length); - pos += v.length; - } - lengths[i++] = v.length; - } + /** + * Compose the final index row key. + * <p> + * This is faster than adding each value independently as we can just build a single a array and copy everything + * over once. + * + * @param pk + * primary key of the original row + * @param length + * total number of bytes of all the values that should be added + * @param values + * to use when building the key + */ + static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) { + // now build up expected row key, each of the values, in order, followed by the PK and then some + // info about lengths so we can deserialize each value + byte[] output = new byte[length + pk.length]; + int pos = 0; + int[] lengths = new int[values.size()]; + int i = 0; + for (ColumnEntry entry : values) { + byte[] v = entry.value; + // skip doing the copy attempt, if we don't need to + if (v.length != 0) { + System.arraycopy(v, 0, output, pos, v.length); + pos += v.length; + } + lengths[i++] = v.length; + } + + // add the primary key to the end of the row key + System.arraycopy(pk, 0, output, pos, pk.length); - // add the primary key to the end of the row key - System.arraycopy(pk, 0, output, pos, pk.length); + // add the lengths as suffixes so we can deserialize the elements again + for (int l : lengths) { + output = ArrayUtils.addAll(output, Bytes.toBytes(l)); + } - // add the lengths as suffixes so we can deserialize the elements again - for (int l : lengths) { - output = ArrayUtils.addAll(output, Bytes.toBytes(l)); + // and the last integer is the number of values + return ArrayUtils.addAll(output, Bytes.toBytes(values.size())); } - // and the last integer is the number of values - return ArrayUtils.addAll(output, Bytes.toBytes(values.size())); - } + /** + * Essentially a short-cut from building a {@link Put}. + * + * @param pk + * row key + * @param timestamp + * timestamp of all the keyvalues + * @param values + * expected value--column pair + * @return a keyvalues that the index contains for a given row at a timestamp with the given value -- column pairs. + */ + public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp, + List<Pair<byte[], CoveredColumn>> values) { + + int length = 0; + List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size()); + for (Pair<byte[], CoveredColumn> value : values) { + ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond()); + length += value.getFirst().length; + expected.add(entry); + } - /** - * Essentially a short-cut from building a {@link Put}. - * @param pk row key - * @param timestamp timestamp of all the keyvalues - * @param values expected value--column pair - * @return a keyvalues that the index contains for a given row at a timestamp with the given value - * -- column pairs. - */ - public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp, - List<Pair<byte[], CoveredColumn>> values) { - - int length = 0; - List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size()); - for (Pair<byte[], CoveredColumn> value : values) { - ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond()); - length += value.getFirst().length; - expected.add(entry); - } - - byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected); - Put p = new Put(rowKey, timestamp); - CoveredColumnIndexCodec.addColumnsToPut(p, expected); - List<KeyValue> kvs = new ArrayList<KeyValue>(); - for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) { - kvs.addAll(entry.getValue()); - } - - return kvs; - } + byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected); + Put p = new Put(rowKey, timestamp); + CoveredColumnIndexCodec.addColumnsToPut(p, expected); + List<KeyValue> kvs = new ArrayList<KeyValue>(); + for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) { + kvs.addAll(entry.getValue()); + } - public static List<byte[]> getValues(byte[] bytes) { - // get the total number of keys in the bytes - int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); - List<byte[]> keys = new ArrayList<byte[]>(keyCount); - int[] lengths = new int[keyCount]; - int lengthPos = keyCount - 1; - int pos = bytes.length - Bytes.SIZEOF_INT; - // figure out the length of each key - for (int i = 0; i < keyCount; i++) { - lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); - pos -= Bytes.SIZEOF_INT; + return kvs; } - int current = 0; - for (int length : lengths) { - byte[] key = Arrays.copyOfRange(bytes, current, current + length); - keys.add(key); - current += length; - } + public static List<byte[]> getValues(byte[] bytes) { + // get the total number of keys in the bytes + int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); + List<byte[]> keys = new ArrayList<byte[]>(keyCount); + int[] lengths = new int[keyCount]; + int lengthPos = keyCount - 1; + int pos = bytes.length - Bytes.SIZEOF_INT; + // figure out the length of each key + for (int i = 0; i < keyCount; i++) { + lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); + pos -= Bytes.SIZEOF_INT; + } - return keys; - } + int current = 0; + for (int length : lengths) { + byte[] key = Arrays.copyOfRange(bytes, current, current + length); + keys.add(key); + current += length; + } - /** - * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes - * @param bytes array to read from - * @param start start point, backwards from which to read. For example, if specifying "25", we - * would try to read an integer from 21 -> 25 - * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, if it exists. - */ - private static int getPreviousInteger(byte[] bytes, int start) { - return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT); - } + return keys; + } - /** - * Check to see if an row key just contains a list of null values. - * @param bytes row key to examine - * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> otherwise - */ - public static boolean checkRowKeyForAllNulls(byte[] bytes) { - int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); - int pos = bytes.length - Bytes.SIZEOF_INT; - for (int i = 0; i < keyCount; i++) { - int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); - if (next > 0) { - return false; - } - pos -= Bytes.SIZEOF_INT; + /** + * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes + * + * @param bytes + * array to read from + * @param start + * start point, backwards from which to read. For example, if specifying "25", we would try to read an + * integer from 21 -> 25 + * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, if it exists. + */ + private static int getPreviousInteger(byte[] bytes, int start) { + return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT); } - return true; - } + /** + * Check to see if an row key just contains a list of null values. + * + * @param bytes + * row key to examine + * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> otherwise + */ + public static boolean checkRowKeyForAllNulls(byte[] bytes) { + int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); + int pos = bytes.length - Bytes.SIZEOF_INT; + for (int i = 0; i < keyCount; i++) { + int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); + if (next > 0) { return false; } + pos -= Bytes.SIZEOF_INT; + } - @Override - public boolean isEnabled(Mutation m) { - // this could be a bit smarter, looking at the groups for the mutation, but we leave it at this - // simple check for the moment. - return groups.size() > 0; - } + return true; + } + + @Override + public boolean isEnabled(Mutation m) { + // this could be a bit smarter, looking at the groups for the mutation, but we leave it at this + // simple check for the moment. + return groups.size() > 0; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java index 362ef18,ff33ec2..e120268 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java @@@ -137,10 -136,10 +137,10 @@@ public class ScannerBuilder } @Override - public boolean seek(KeyValue next) throws IOException { + public boolean seek(Cell next) throws IOException { // check to see if the next kv is after the current key, in which case we can use reseek, // which will be more efficient - KeyValue peek = kvScanner.peek(); + Cell peek = kvScanner.peek(); // there is another value and its before the requested one - we can do a reseek! if (peek != null) { int compare = KeyValue.COMPARATOR.compare(peek, next); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index bd0461d,b060345..1f64003 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@@ -301,7 -270,7 +307,8 @@@ public class IndexMaintainer implement private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) { this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null); + assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() == PTableType.TABLE || dataTable.getType() == PTableType.VIEW); + this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable(); this.isMultiTenant = dataTable.isMultiTenant(); this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId()); this.isLocalIndex = index.getIndexType() == IndexType.LOCAL; @@@ -820,31 -827,17 +866,31 @@@ return put; } - public boolean isRowDeleted(Collection<KeyValue> pendingUpdates) { + private enum DeleteType {SINGLE_VERSION, ALL_VERSIONS}; - private DeleteType getDeleteTypeOrNull(Collection<Cell> pendingUpdates) { ++ private DeleteType getDeleteTypeOrNull(Collection<KeyValue> pendingUpdates) { int nDeleteCF = 0; + int nDeleteVersionCF = 0; - for (Cell kv : pendingUpdates) { + for (KeyValue kv : pendingUpdates) { - if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { - nDeleteCF++; + if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { + nDeleteVersionCF++; } + else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() + // Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor + // TODO see if implement PhoenixTransactionalIndexer.preDelete will work instead of the following check + || (CellUtil.matchingQualifier(kv, TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) { + nDeleteCF++; + } } - return nDeleteCF == this.nDataCFs && nDeleteCF > 0; + // This is what a delete looks like on the server side for mutable indexing... + // Should all be one or the other for DeleteFamily versus DeleteFamilyVersion, but just in case not + return nDeleteVersionCF >= this.nDataCFs ? DeleteType.SINGLE_VERSION : nDeleteCF + nDeleteVersionCF >= this.nDataCFs ? DeleteType.ALL_VERSIONS : null; + } + - public boolean isRowDeleted(Collection<Cell> pendingUpdates) { ++ public boolean isRowDeleted(Collection<KeyValue> pendingUpdates) { + return getDeleteTypeOrNull(pendingUpdates) != null; } - private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<Cell> pendingUpdates) throws IOException { + private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<KeyValue> pendingUpdates) throws IOException { if (pendingUpdates.isEmpty()) { return false; } @@@ -884,25 -877,11 +930,25 @@@ } @SuppressWarnings("deprecation") - public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { + public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey); // Delete the entire row if any of the indexed columns changed - if (oldState == null || isRowDeleted(pendingUpdates) || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row - Delete delete = new Delete(indexRowKey, ts); + DeleteType deleteType = null; + if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row + byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary(); + Delete delete = new Delete(indexRowKey); + // If table delete was single version, then index delete should be as well + if (deleteType == DeleteType.SINGLE_VERSION) { + for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs? + delete.deleteFamilyVersion(ref.getFamily(), ts); + } + delete.deleteFamilyVersion(emptyCF, ts); + } else { + for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs? + delete.deleteFamily(ref.getFamily(), ts); + } + delete.deleteFamily(emptyCF, ts); + } delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); return delete; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index 09a9f90,3ef01fe..0601e0a --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@@ -28,20 -26,15 +27,18 @@@ import org.apache.hadoop.conf.Configura import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; + import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.phoenix.compile.ScanRanges; -import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.types.PVarbinary; - import org.apache.phoenix.util.ScanUtil; - import org.apache.phoenix.util.SchemaUtil; import com.google.common.collect.Lists; @@@ -93,11 -67,10 +90,11 @@@ public class PhoenixIndexBuilder extend } if (maintainers.isEmpty()) return; Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values())); + scan.setRaw(true); - ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); + ScanRanges scanRanges = ScanRanges.createPointLookup(keys); scanRanges.initializeScan(scan); scan.setFilter(scanRanges.getSkipScanFilter()); - HRegion region = env.getRegion(); - Region region = this.env.getRegion(); ++ Region region = env.getRegion(); RegionScanner scanner = region.getScanner(scan); // Run through the scanner using internal nextRaw method region.startRegionOperation(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 2719119,222aefb..7acc90c --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@@ -14,9 -23,8 +14,9 @@@ import java.util.Collections import java.util.List; import java.util.Map; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; - import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; + import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; @@@ -58,45 -113,71 +58,45 @@@ public class PhoenixIndexCodec extends } @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException { - return getIndexUpdates(state, false); - } - - /** - * - * @param state - * @param upsert prepare index upserts if it's true otherwise prepare index deletes. - * @return - * @throws IOException - */ - private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException { - List<IndexMaintainer> indexMaintainers = getIndexMaintainers(state.getUpdateAttributes()); - if (indexMaintainers.isEmpty()) { + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException { + List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers(); + if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) { return Collections.emptyList(); } + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(state.getCurrentRowKey()); List<IndexUpdate> indexUpdates = Lists.newArrayList(); + for (IndexMaintainer maintainer : indexMaintainers) { + Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns()); + ValueGetter valueGetter = statePair.getFirst(); + IndexUpdate indexUpdate = statePair.getSecond(); + indexUpdate.setTable(maintainer.getIndexTableName()); + Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env - .getRegion().getStartKey(), env.getRegion().getEndKey()); ++ .getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); + indexUpdate.setUpdate(put); + indexUpdates.add(indexUpdate); + } + return indexUpdates; + } + + @Override + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException { + List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers(); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - // TODO: state.getCurrentRowKey() should take an ImmutableBytesWritable arg to prevent byte copy - byte[] dataRowKey = state.getCurrentRowKey(); - ptr.set(dataRowKey); - byte[] localIndexTableName = MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName()); - ValueGetter valueGetter = null; - Scanner scanner = null; + ptr.set(state.getCurrentRowKey()); + List<IndexUpdate> indexUpdates = Lists.newArrayList(); for (IndexMaintainer maintainer : indexMaintainers) { - if(upsert) { - // Short-circuit building state when we know it's a row deletion - if (maintainer.isRowDeleted(state.getPendingUpdate())) { - continue; - } - } - IndexUpdate indexUpdate = null; - if (maintainer.isImmutableRows()) { - indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns())); - if(maintainer.isLocalIndex()) { - indexUpdate.setTable(localIndexTableName); - } else { - indexUpdate.setTable(maintainer.getIndexTableName()); - } - valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate()); - } else { - // TODO: if more efficient, I could do this just once with all columns in all indexes - Pair<Scanner,IndexUpdate> statePair = state.getIndexedColumnsTableState(maintainer.getAllColumns()); - scanner = statePair.getFirst(); - indexUpdate = statePair.getSecond(); - indexUpdate.setTable(maintainer.getIndexTableName()); - valueGetter = IndexManagementUtil.createGetterFromScanner(scanner, dataRowKey); - } - Mutation mutation = null; - if (upsert) { - mutation = - maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(), - env.getRegion().getRegionInfo().getStartKey(), - env.getRegion().getRegionInfo().getEndKey()); - } else { - mutation = - maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state.getPendingUpdate(), - state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(), - env.getRegion().getRegionInfo().getEndKey()); - } - indexUpdate.setUpdate(mutation); - if (scanner != null) { - scanner.close(); - scanner = null; - } + // For transactional tables, we use an index maintainer + // to aid in rollback if there's a KeyValue column in the index. The alternative would be + // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the + // client side. + Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns()); + ValueGetter valueGetter = statePair.getFirst(); + IndexUpdate indexUpdate = statePair.getSecond(); + indexUpdate.setTable(maintainer.getIndexTableName()); + Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(), - state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey()); ++ state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); + indexUpdate.setUpdate(delete); indexUpdates.add(indexUpdate); } return indexUpdates;
