Secondary indexing with txns
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5a558e16 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5a558e16 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5a558e16 Branch: refs/heads/txn Commit: 5a558e16cd7b1e882b274683aff6b655f952dac1 Parents: 826ebf5 Author: James Taylor <[email protected]> Authored: Fri Mar 27 10:35:37 2015 -0700 Committer: James Taylor <[email protected]> Committed: Fri Mar 27 10:35:37 2015 -0700 ---------------------------------------------------------------------- .../EndToEndCoveredColumnsIndexBuilderIT.java | 12 +- .../covered/example/FailWithoutRetriesIT.java | 198 +++--- .../apache/phoenix/hbase/index/ValueGetter.java | 4 +- .../hbase/index/builder/BaseIndexBuilder.java | 170 +++-- .../hbase/index/builder/IndexBuildManager.java | 14 - .../hbase/index/builder/IndexBuilder.java | 47 +- .../phoenix/hbase/index/covered/Batch.java | 11 +- .../covered/CoveredColumnsIndexBuilder.java | 491 --------------- .../phoenix/hbase/index/covered/IndexCodec.java | 172 +++-- .../hbase/index/covered/LocalTableState.java | 453 +++++++------ .../hbase/index/covered/NonTxIndexBuilder.java | 405 ++++++++++++ .../phoenix/hbase/index/covered/TableState.java | 45 +- .../hbase/index/covered/TxIndexBuilder.java | 247 ++++++++ .../index/covered/data/LazyValueGetter.java | 18 +- .../example/CoveredColumnIndexCodec.java | 628 +++++++++---------- .../CoveredColumnIndexSpecifierBuilder.java | 4 +- .../covered/example/CoveredColumnIndexer.java | 9 +- .../index/covered/update/ColumnTracker.java | 3 +- .../covered/update/IndexUpdateManager.java | 5 +- .../hbase/index/scanner/EmptyScanner.java | 8 +- .../phoenix/hbase/index/scanner/Scanner.java | 7 +- .../hbase/index/scanner/ScannerBuilder.java | 12 +- .../hbase/index/write/IndexFailurePolicy.java | 4 +- .../apache/phoenix/index/IndexMaintainer.java | 28 +- .../phoenix/index/PhoenixIndexBuilder.java | 90 +-- .../apache/phoenix/index/PhoenixIndexCodec.java | 145 ++--- .../index/PhoenixIndexFailurePolicy.java | 2 +- .../phoenix/index/PhoenixTxIndexBuilder.java | 53 ++ .../index/PhoenixTxIndexFailurePolicy.java | 50 ++ .../query/ConnectionQueryServicesImpl.java | 10 +- .../phoenix/schema/tuple/ValueGetterTuple.java | 16 +- .../java/org/apache/phoenix/util/IndexUtil.java | 64 +- .../covered/CoveredIndexCodecForTesting.java | 93 ++- .../index/covered/TestLocalTableState.java | 20 +- .../example/TestCoveredColumnIndexCodec.java | 16 +- .../phoenix/index/IndexMaintainerTest.java | 2 +- 36 files changed, 1904 insertions(+), 1652 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java index 84c6827..fa85f00 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java @@ -59,7 +59,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; /** - * End-to-End test of just the {@link CoveredColumnsIndexBuilder}, but with a simple + * End-to-End test of just the {@link NonTxIndexBuilder}, but with a simple * {@link IndexCodec} and BatchCache implementation. */ @Category(NeedsOwnMiniClusterTest.class) @@ -151,7 +151,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT { ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns)).getFirst(); int count = 0; - KeyValue kv; + Cell kv; while ((kv = kvs.next()) != null) { Cell next = expectedKvs.get(count++); assertEquals( @@ -302,9 +302,9 @@ public class EndToEndCoveredColumnsIndexBuilderIT { Map<String, String> indexerOpts = new HashMap<String, String>(); // just need to set the codec - we are going to set it later, but we need something here or the // initializer blows up. - indexerOpts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, + indexerOpts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, CoveredIndexCodecForTesting.class.getName()); - Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER); + Indexer.enableIndexing(desc, NonTxIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER); // create the table HBaseAdmin admin = UTIL.getHBaseAdmin(); @@ -315,8 +315,8 @@ public class EndToEndCoveredColumnsIndexBuilderIT { HRegion region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0); Indexer indexer = (Indexer) region.getCoprocessorHost().findCoprocessor(Indexer.class.getName()); - CoveredColumnsIndexBuilder builder = - (CoveredColumnsIndexBuilder) indexer.getBuilderForTesting(); + NonTxIndexBuilder builder = + (NonTxIndexBuilder) indexer.getBuilderForTesting(); VerifyingIndexCodec codec = new VerifyingIndexCodec(); builder.setIndexCodecForTesting(codec); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java index dbe78e7..281ad63 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java @@ -1,19 +1,11 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. */ package org.apache.phoenix.hbase.index.covered.example; @@ -32,6 +24,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.util.Bytes; @@ -49,105 +42,106 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; - /** - * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String} - * constructor), {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize - * the exception, and just return <tt>null</tt> to the client, which then just goes and retries. + * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String} constructor), + * {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize the exception, and just return + * <tt>null</tt> to the client, which then just goes and retries. */ @Category(NeedsOwnMiniClusterTest.class) public class FailWithoutRetriesIT { - private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class); - @Rule - public TableName table = new TableName(); + private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class); + @Rule + public TableName table = new TableName(); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private String getIndexTableName() { + return Bytes.toString(table.getTableName()) + "_index"; + } - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + public static class FailingTestCodec extends BaseIndexCodec { - private String getIndexTableName() { - return Bytes.toString(table.getTableName()) + "_index"; - } + @Override + public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException { + throw new RuntimeException("Intentionally failing deletes for " + FailWithoutRetriesIT.class.getName()); + } - public static class FailingTestCodec extends BaseIndexCodec { + @Override + public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException { + throw new RuntimeException("Intentionally failing upserts for " + FailWithoutRetriesIT.class.getName()); + } + + @Override + public void setContext(TableState state, Mutation mutation) throws IOException {} + + } - @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException { - throw new RuntimeException("Intentionally failing deletes for " - + FailWithoutRetriesIT.class.getName()); + @BeforeClass + public static void setupCluster() throws Exception { + // setup and verify the config + Configuration conf = UTIL.getConfiguration(); + setUpConfigForMiniCluster(conf); + IndexTestingUtils.setupConfig(conf); + IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf); + // start the cluster + UTIL.startMiniCluster(); } - @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException { - throw new RuntimeException("Intentionally failing upserts for " - + FailWithoutRetriesIT.class.getName()); + @AfterClass + public static void teardownCluster() throws Exception { + UTIL.shutdownMiniCluster(); } - } - - @BeforeClass - public static void setupCluster() throws Exception { - // setup and verify the config - Configuration conf = UTIL.getConfiguration(); - setUpConfigForMiniCluster(conf); - IndexTestingUtils.setupConfig(conf); - IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf); - // start the cluster - UTIL.startMiniCluster(); - } - - @AfterClass - public static void teardownCluster() throws Exception { - UTIL.shutdownMiniCluster(); - } - - /** - * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't - * rethrowing the exception correctly? - * <p> - * We use a custom codec to enforce the thrown exception. - * @throws Exception - */ - @Test(timeout = 300000) - public void testQuickFailure() throws Exception { - // incorrectly setup indexing for the primary table - target index table doesn't exist, which - // should quickly return to the client - byte[] family = Bytes.toBytes("family"); - ColumnGroup fam1 = new ColumnGroup(getIndexTableName()); - // values are [col1] - fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS)); - CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder(); - // add the index family - builder.addIndexGroup(fam1); - // usually, we would create the index table here, but we don't for the sake of the test. - - // setup the primary table - String primaryTable = Bytes.toString(table.getTableName()); - @SuppressWarnings("deprecation") - HTableDescriptor pTable = new HTableDescriptor(primaryTable); - pTable.addFamily(new HColumnDescriptor(family)); - // override the codec so we can use our test one - builder.build(pTable, FailingTestCodec.class); - - // create the primary table - HBaseAdmin admin = UTIL.getHBaseAdmin(); - admin.createTable(pTable); - Configuration conf = new Configuration(UTIL.getConfiguration()); - // up the number of retries/wait time to make it obvious that we are failing with retries here - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20); - conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000); - HTable primary = new HTable(conf, primaryTable); - primary.setAutoFlush(false, true); - - // do a simple put that should be indexed - Put p = new Put(Bytes.toBytes("row")); - p.add(family, null, Bytes.toBytes("value")); - primary.put(p); - try { - primary.flushCommits(); - fail("Shouldn't have gotten a successful write to the primary table"); - } catch (RetriesExhaustedWithDetailsException e) { - LOG.info("Correclty got a failure of the put!"); + /** + * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't rethrowing the exception + * correctly? + * <p> + * We use a custom codec to enforce the thrown exception. + * + * @throws Exception + */ + @Test(timeout = 300000) + public void testQuickFailure() throws Exception { + // incorrectly setup indexing for the primary table - target index table doesn't exist, which + // should quickly return to the client + byte[] family = Bytes.toBytes("family"); + ColumnGroup fam1 = new ColumnGroup(getIndexTableName()); + // values are [col1] + fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS)); + CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder(); + // add the index family + builder.addIndexGroup(fam1); + // usually, we would create the index table here, but we don't for the sake of the test. + + // setup the primary table + String primaryTable = Bytes.toString(table.getTableName()); + @SuppressWarnings("deprecation") + HTableDescriptor pTable = new HTableDescriptor(primaryTable); + pTable.addFamily(new HColumnDescriptor(family)); + // override the codec so we can use our test one + builder.build(pTable, FailingTestCodec.class); + + // create the primary table + HBaseAdmin admin = UTIL.getHBaseAdmin(); + admin.createTable(pTable); + Configuration conf = new Configuration(UTIL.getConfiguration()); + // up the number of retries/wait time to make it obvious that we are failing with retries here + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20); + conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000); + HTable primary = new HTable(conf, primaryTable); + primary.setAutoFlush(false, true); + + // do a simple put that should be indexed + Put p = new Put(Bytes.toBytes("row")); + p.add(family, null, Bytes.toBytes("value")); + primary.put(p); + try { + primary.flushCommits(); + fail("Shouldn't have gotten a successful write to the primary table"); + } catch (RetriesExhaustedWithDetailsException e) { + LOG.info("Correclty got a failure of the put!"); + } + primary.close(); } - primary.close(); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java index a6e36cb..bcadc2b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java @@ -19,8 +19,8 @@ package org.apache.phoenix.hbase.index; import java.io.IOException; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; public interface ValueGetter { @@ -32,7 +32,7 @@ public interface ValueGetter { * present. * @throws IOException if there is an error accessing the underlying data storage */ - public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException; + public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException; public byte[] getRowKey(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java index f9df296..dfb9ad4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java @@ -1,95 +1,127 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. */ package org.apache.phoenix.hbase.index.builder; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.covered.IndexCodec; +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; /** * Basic implementation of the {@link IndexBuilder} that doesn't do any actual work of indexing. * <p> - * You should extend this class, rather than implementing IndexBuilder directly to maintain - * compatability going forward. + * You should extend this class, rather than implementing IndexBuilder directly to maintain compatability going forward. * <p> - * Generally, you should consider using one of the implemented IndexBuilders (e.g - * {@link CoveredColumnsIndexBuilder}) as there is a lot of work required to keep an index table - * up-to-date. + * Generally, you should consider using one of the implemented IndexBuilders (e.g {@link NonTxIndexBuilder}) as there is + * a lot of work required to keep an index table up-to-date. */ public abstract class BaseIndexBuilder implements IndexBuilder { + public static final String CODEC_CLASS_NAME_KEY = "org.apache.hadoop.hbase.index.codec.class"; + private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class); - private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class); - protected boolean stopped; + protected boolean stopped; + protected RegionCoprocessorEnvironment env; + protected IndexCodec codec; - @Override - public void extendBaseIndexBuilderInstead() { } - - @Override - public void setup(RegionCoprocessorEnvironment conf) throws IOException { - // noop - } + abstract protected boolean useRawScanToPrimeBlockCache(); + + @Override + public void extendBaseIndexBuilderInstead() {} - @Override - public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - // noop - } + @Override + public void setup(RegionCoprocessorEnvironment env) throws IOException { + this.env = env; + // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here + // so we can use it later when generalizing covered indexes + Configuration conf = env.getConfiguration(); + Class<? extends IndexCodec> codecClass = conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class); + try { + Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]); + meth.setAccessible(true); + this.codec = meth.newInstance(); + this.codec.initialize(env); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + } - @Override - public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) { - // noop - } - - /** - * By default, we always attempt to index the mutation. Commonly this can be slow (because the - * framework spends the time to do the indexing, only to realize that you don't need it) or not - * ideal (if you want to turn on/off indexing on a table without completely reloading it). - * @throws IOException - */ - @Override - public boolean isEnabled(Mutation m) throws IOException { - return true; - } + @Override + public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + // noop + } - /** - * {@inheritDoc} - * <p> - * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each - * mutation always applies to different rows, even if they are in the same batch, or are - * independent updates. - */ - @Override - public byte[] getBatchId(Mutation m) { - return null; - } + @Override + public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) { + // noop + } - @Override - public void stop(String why) { - LOG.debug("Stopping because: " + why); - this.stopped = true; - } + /** + * By default, we always attempt to index the mutation. Commonly this can be slow (because the framework spends the + * time to do the indexing, only to realize that you don't need it) or not ideal (if you want to turn on/off + * indexing on a table without completely reloading it). + * + * @throws IOException + */ + @Override + public boolean isEnabled(Mutation m) throws IOException { + // ask the codec to see if we should even attempt indexing + return this.codec.isEnabled(m); + } - @Override - public boolean isStopped() { - return this.stopped; - } + /** + * Exposed for testing! + * + * @param codec + * codec to use for this instance of the builder + */ + public void setIndexCodecForTesting(IndexCodec codec) { + this.codec = codec; + } + + @Override + public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered) + throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc} + * <p> + * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each mutation always applies + * to different rows, even if they are in the same batch, or are independent updates. + */ + @Override + public byte[] getBatchId(Mutation m) { + return this.codec.getBatchId(m); + } + + @Override + public void stop(String why) { + LOG.debug("Stopping because: " + why); + this.stopped = true; + } + + @Override + public boolean isStopped() { + return this.stopped; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index ba9534c..d5fd34d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -156,19 +155,6 @@ public class IndexBuildManager implements Stoppable { return results; } - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException { - // all we get is a single update, so it would probably just go slower if we needed to queue it - // up. It will increase underlying resource contention a little bit, but the mutation case is - // far more common, so let's not worry about it for now. - // short circuit so we don't waste time. - if (!this.delegate.isEnabled(delete)) { - return null; - } - - return delegate.getIndexUpdate(delete); - - } - public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows( Collection<KeyValue> filtered) throws IOException { // this is run async, so we can take our time here http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java index b91a52a..194fdcc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java @@ -24,7 +24,6 @@ import java.util.Map; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -79,32 +78,26 @@ public interface IndexBuilder extends Stoppable { public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException; - /** - * The counter-part to {@link #getIndexUpdate(Mutation)} - your opportunity to update any/all - * index tables based on the delete of the primary table row. This is only called for cases where - * the client sends a single delete ({@link HTable#delete}). We separate this method from - * {@link #getIndexUpdate(Mutation)} only for the ease of implementation as the delete path has - * subtly different semantics for updating the families/timestamps from the generic batch path. - * <p> - * Its up to your implementation to ensure that timestamps match between the primary and index - * tables. - * <p> - * Implementers must ensure that this method is thread-safe - it could (and probably will) be - * called concurrently for different mutations, which may or may not be part of the same batch. - * @param delete {@link Delete} to the primary table that may be indexed - * @return a {@link Map} of the mutations to make -> target index table name - * @throws IOException on failure - */ - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException; - - /** - * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal - * flush or compaction mechanisms. - * @param filtered {@link KeyValue}s that previously existed, but won't be included in further - * output from HBase. - * @return a {@link Map} of the mutations to make -> target index table name - * @throws IOException on failure - */ + /** + * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction + * mechanisms. Currently not implemented by any implementors nor called, but left here to be implemented if we + * ever need it. In Jesse's words: + * + * Arguably, this is a correctness piece that should be used, but isn't. Basically, it *could* be that + * if a compaction/flush were to remove a key (too old, too many versions) you might want to cleanup the index table + * as well, if it were to get out of sync with the primary table. For instance, you might get multiple versions of + * the same row, which should eventually age of the oldest version. However, in the index table there would only + * ever be two entries for that row - the first one, matching the original row, and the delete marker for the index + * update, set when we got a newer version of the primary row. So, a basic HBase scan wouldn't show the index update + * b/c its covered by the delete marker, but an older timestamp based read would actually show the index row, even + * after the primary table row is gone due to MAX_VERSIONS requirement. + * + * @param filtered {@link KeyValue}s that previously existed, but won't be included + * in further output from HBase. + * + * @return a {@link Map} of the mutations to make -> target index table name + * @throws IOException on failure + */ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows( Collection<KeyValue> filtered) throws IOException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java index e707ea2..5e0da3c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java @@ -20,6 +20,7 @@ package org.apache.phoenix.hbase.index.covered; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; /** @@ -27,9 +28,9 @@ import org.apache.hadoop.hbase.KeyValue; */ public class Batch { - private static final long pointDeleteCode = KeyValue.Type.Delete.getCode(); + private static final byte pointDeleteCode = KeyValue.Type.Delete.getCode(); private final long timestamp; - private List<KeyValue> batch = new ArrayList<KeyValue>(); + private List<Cell> batch = new ArrayList<Cell>(); private boolean allPointDeletes = true; /** @@ -39,8 +40,8 @@ public class Batch { this.timestamp = ts; } - public void add(KeyValue kv){ - if (pointDeleteCode != kv.getType()) { + public void add(Cell kv){ + if (pointDeleteCode != kv.getTypeByte()) { allPointDeletes = false; } batch.add(kv); @@ -54,7 +55,7 @@ public class Batch { return this.timestamp; } - public List<KeyValue> getKvs() { + public List<Cell> getKvs() { return this.batch; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java deleted file mode 100644 index 6524fd4..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java +++ /dev/null @@ -1,491 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hbase.index.covered; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; - -import com.google.common.collect.Lists; -import com.google.common.primitives.Longs; -import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder; -import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; -import org.apache.phoenix.hbase.index.covered.data.LocalTable; -import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; -import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; -import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; - -/** - * Build covered indexes for phoenix updates. - * <p> - * Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't - * need to do any extra synchronization in the IndexBuilder. - * <p> - * NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or - * flush, leading to a bloated index that needs to be cleaned up by a background process. - */ -public class CoveredColumnsIndexBuilder extends BaseIndexBuilder { - - private static final Log LOG = LogFactory.getLog(CoveredColumnsIndexBuilder.class); - public static final String CODEC_CLASS_NAME_KEY = "org.apache.hadoop.hbase.index.codec.class"; - - protected RegionCoprocessorEnvironment env; - protected IndexCodec codec; - protected LocalHBaseState localTable; - - @Override - public void setup(RegionCoprocessorEnvironment env) throws IOException { - this.env = env; - // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here - // so we can use it later when generalizing covered indexes - Configuration conf = env.getConfiguration(); - Class<? extends IndexCodec> codecClass = - conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class); - try { - Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]); - meth.setAccessible(true); - this.codec = meth.newInstance(); - this.codec.initialize(env); - } catch (IOException e) { - throw e; - } catch (Exception e) { - throw new IOException(e); - } - - this.localTable = new LocalTable(env); - } - - @Override - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException { - // build the index updates for each group - IndexUpdateManager updateMap = new IndexUpdateManager(); - - batchMutationAndAddUpdates(updateMap, mutation); - - if (LOG.isDebugEnabled()) { - LOG.debug("Found index updates for Mutation: " + mutation + "\n" + updateMap); - } - - return updateMap.toMap(); - } - - /** - * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each - * key-value in the update to see if it matches the others. Generally, this will be the case, but - * you can add kvs to a mutation that don't all have the timestamp, so we need to manage - * everything in batches based on timestamp. - * <p> - * Adds all the updates in the {@link Mutation} to the state, as a side-effect. - * @param updateMap index updates into which to add new updates. Modified as a side-effect. - * @param state current state of the row for the mutation. - * @param m mutation to batch - * @throws IOException - */ - private void batchMutationAndAddUpdates(IndexUpdateManager manager, Mutation m) throws IOException { - // split the mutation into timestamp-based batches - Collection<Batch> batches = createTimestampBatchesFromMutation(m); - - // create a state manager, so we can manage each batch - LocalTableState state = new LocalTableState(env, localTable, m); - - // go through each batch of keyvalues and build separate index entries for each - boolean cleanupCurrentState = true; - for (Batch batch : batches) { - /* - * We have to split the work between the cleanup and the update for each group because when we - * update the current state of the row for the current batch (appending the mutations for the - * current batch) the next group will see that as the current state, which will can cause the - * a delete and a put to be created for the next group. - */ - if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) { - cleanupCurrentState = false; - } - } - } - - /** - * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any - * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at - * the time the method is called. - * @param m {@link Mutation} from which to extract the {@link KeyValue}s - * @return the mutation, broken into batches and sorted in ascending order (smallest first) - */ - protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) { - Map<Long, Batch> batches = new HashMap<Long, Batch>(); - for (List<Cell> family : m.getFamilyCellMap().values()) { - List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family); - createTimestampBatchesFromKeyValues(familyKVs, batches); - } - // sort the batches - List<Batch> sorted = new ArrayList<Batch>(batches.values()); - Collections.sort(sorted, new Comparator<Batch>() { - @Override - public int compare(Batch o1, Batch o2) { - return Longs.compare(o1.getTimestamp(), o2.getTimestamp()); - } - }); - return sorted; - } - - /** - * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any - * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at - * the time the method is called. - * @param kvs {@link KeyValue}s to break into batches - * @param batches to update with the given kvs - */ - protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs, - Map<Long, Batch> batches) { - long now = EnvironmentEdgeManager.currentTimeMillis(); - byte[] nowBytes = Bytes.toBytes(now); - - // batch kvs by timestamp - for (KeyValue kv : kvs) { - long ts = kv.getTimestamp(); - // override the timestamp to the current time, so the index and primary tables match - // all the keys with LATEST_TIMESTAMP will then be put into the same batch - if (kv.updateLatestStamp(nowBytes)) { - ts = now; - } - Batch batch = batches.get(ts); - if (batch == null) { - batch = new Batch(ts); - batches.put(ts, batch); - } - batch.add(kv); - } - } - - /** - * For a single batch, get all the index updates and add them to the updateMap - * <p> - * This method manages cleaning up the entire history of the row from the given timestamp forward - * for out-of-order (e.g. 'back in time') updates. - * <p> - * If things arrive out of order (client is using custom timestamps) we should still see the index - * in the correct order (assuming we scan after the out-of-order update in finished). Therefore, - * we when we aren't the most recent update to the index, we need to delete the state at the - * current timestamp (similar to above), but also issue a delete for the added index updates at - * the next newest timestamp of any of the columns in the update; we need to cleanup the insert so - * it looks like it was also deleted at that next newest timestamp. However, its not enough to - * just update the one in front of us - that column will likely be applied to index entries up the - * entire history in front of us, which also needs to be fixed up. - * <p> - * However, the current update usually will be the most recent thing to be added. In that case, - * all we need to is issue a delete for the previous index row (the state of the row, without the - * update applied) at the current timestamp. This gets rid of anything currently in the index for - * the current state of the row (at the timestamp). Then we can just follow that by applying the - * pending update and building the index update based on the new row state. - * @param updateMap map to update with new index elements - * @param batch timestamp-based batch of edits - * @param state local state to update and pass to the codec - * @param requireCurrentStateCleanup <tt>true</tt> if we should should attempt to cleanup the - * current state of the table, in the event of a 'back in time' batch. <tt>false</tt> - * indicates we should not attempt the cleanup, e.g. an earlier batch already did the - * cleanup. - * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put), - * <tt>false</tt> otherwise - * @throws IOException - */ - private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch, - LocalTableState state, boolean requireCurrentStateCleanup) throws IOException { - - // need a temporary manager for the current batch. It should resolve any conflicts for the - // current batch. Essentially, we can get the case where a batch doesn't change the current - // state of the index (all Puts are covered by deletes), in which case we don't want to add - // anything - // A. Get the correct values for the pending state in the batch - // A.1 start by cleaning up the current state - as long as there are key-values in the batch - // that are indexed, we need to change the current state of the index. Its up to the codec to - // determine if we need to make any cleanup given the pending update. - long batchTs = batch.getTimestamp(); - state.setPendingUpdates(batch.getKvs()); - addCleanupForCurrentBatch(updateMap, batchTs, state); - - // A.2 do a single pass first for the updates to the current state - state.applyPendingUpdates(); - long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap); - // if all the updates are the latest thing in the index, we are done - don't go and fix history - if (ColumnTracker.isNewestTime(minTs)) { - return false; - } - - // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the - // index. after this, we have the correct view of the index, from the batch up to the index - while(!ColumnTracker.isNewestTime(minTs) ){ - minTs = addUpdateForGivenTimestamp(minTs, state, updateMap); - } - - // B. only cleanup the current state if we need to - its a huge waste of effort otherwise. - if (requireCurrentStateCleanup) { - // roll back the pending update. This is needed so we can remove all the 'old' index entries. - // We don't need to do the puts here, but just the deletes at the given timestamps since we - // just want to completely hide the incorrect entries. - state.rollback(batch.getKvs()); - // setup state - state.setPendingUpdates(batch.getKvs()); - - // cleanup the pending batch. If anything in the correct history is covered by Deletes used to - // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both - // because the update may have a different set of columns or value based on the update). - cleanupIndexStateFromBatchOnward(updateMap, batchTs, state); - - // have to roll the state forward again, so the current state is correct - state.applyPendingUpdates(); - return true; - } - return false; - } - - private long addUpdateForGivenTimestamp(long ts, LocalTableState state, - IndexUpdateManager updateMap) throws IOException { - state.setCurrentTimestamp(ts); - ts = addCurrentStateMutationsForBatch(updateMap, state); - return ts; - } - - private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, - LocalTableState state) throws IOException { - // get the cleanup for the current state - state.setCurrentTimestamp(batchTs); - addDeleteUpdatesToMap(updateMap, state, batchTs); - // ignore any index tracking from the delete - state.resetTrackedColumns(); - } - - /** - * Add the necessary mutations for the pending batch on the local state. Handles rolling up - * through history to determine the index changes after applying the batch (for the case where the - * batch is back in time). - * @param updateMap to update with index mutations - * @param batch to apply to the current state - * @param state current state of the table - * @return the minimum timestamp across all index columns requested. If - * {@link ColumnTracker#isNewestTime(long)} returns <tt>true</tt> on the returned - * timestamp, we know that this <i>was not a back-in-time update</i>. - * @throws IOException - */ - private long - addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state) throws IOException { - - // get the index updates for this current batch - Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state); - state.resetTrackedColumns(); - - /* - * go through all the pending updates. If we are sure that all the entries are the latest - * timestamp, we can just add the index updates and move on. However, if there are columns that - * we skip past (based on the timestamp of the batch), we need to roll back up the history. - * Regardless of whether or not they are the latest timestamp, the entries here are going to be - * correct for the current batch timestamp, so we add them to the updates. The only thing we - * really care about it if we need to roll up the history and fix it as we go. - */ - // timestamp of the next update we need to track - long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP; - List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>(); - for (IndexUpdate update : upserts) { - // this is the one bit where we check the timestamps - final ColumnTracker tracker = update.getIndexedColumns(); - long trackerTs = tracker.getTS(); - // update the next min TS we need to track - if (trackerTs < minTs) { - minTs = tracker.getTS(); - } - // track index hints for the next round. Hint if we need an update for that column for the - // next timestamp. These columns clearly won't need to update as we go through time as they - // already match the most recent possible thing. - boolean needsCleanup = false; - if (tracker.hasNewerTimestamps()) { - columnHints.add(tracker); - // this update also needs to be cleaned up at the next timestamp because it not the latest. - needsCleanup = true; - } - - - // only make the put if the index update has been setup - if (update.isValid()) { - byte[] table = update.getTableName(); - Mutation mutation = update.getUpdate(); - updateMap.addIndexUpdate(table, mutation); - - // only make the cleanup if we made a put and need cleanup - if (needsCleanup) { - // there is a TS for the interested columns that is greater than the columns in the - // put. Therefore, we need to issue a delete at the same timestamp - Delete d = new Delete(mutation.getRow()); - d.setTimestamp(tracker.getTS()); - updateMap.addIndexUpdate(table, d); - } - } - } - return minTs; - } - - /** - * Cleanup the index based on the current state from the given batch. Iterates over each timestamp - * (for the indexed rows) for the current state of the table and cleans up all the existing - * entries generated by the codec. - * <p> - * Adds all pending updates to the updateMap - * @param updateMap updated with the pending index updates from the codec - * @param batchTs timestamp from which we should cleanup - * @param state current state of the primary table. Should already by setup to the correct state - * from which we want to cleanup. - * @throws IOException - */ - private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, - long batchTs, LocalTableState state) throws IOException { - // get the cleanup for the current state - state.setCurrentTimestamp(batchTs); - addDeleteUpdatesToMap(updateMap, state, batchTs); - Set<ColumnTracker> trackers = state.getTrackedColumns(); - long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP; - for (ColumnTracker tracker : trackers) { - if (tracker.getTS() < minTs) { - minTs = tracker.getTS(); - } - } - state.resetTrackedColumns(); - if (!ColumnTracker.isNewestTime(minTs)) { - state.setHints(Lists.newArrayList(trackers)); - cleanupIndexStateFromBatchOnward(updateMap, minTs, state); - } - } - - - /** - * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then - * add them to the update map. - * <p> - * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates - * applied, etc). - * @throws IOException - */ - protected void - addDeleteUpdatesToMap(IndexUpdateManager updateMap, - LocalTableState state, long ts) throws IOException { - Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state); - if (cleanup != null) { - for (IndexUpdate d : cleanup) { - if (!d.isValid()) { - continue; - } - // override the timestamps in the delete to match the current batch. - Delete remove = (Delete)d.getUpdate(); - remove.setTimestamp(ts); - updateMap.addIndexUpdate(d.getTableName(), remove); - } - } - } - - @Override - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete d) throws IOException { - // stores all the return values - IndexUpdateManager updateMap = new IndexUpdateManager(); - - // We have to figure out which kind of delete it is, since we need to do different things if its - // a general (row) delete, versus a delete of just a single column or family - Map<byte[], List<Cell>> families = d.getFamilyCellMap(); - - /* - * Option 1: its a row delete marker, so we just need to delete the most recent state for each - * group, as of the specified timestamp in the delete. This can happen if we have a single row - * update and it is part of a batch mutation (prepare doesn't happen until later... maybe a - * bug?). In a single delete, this delete gets all the column families appended, so the family - * map won't be empty by the time it gets here. - */ - if (families.size() == 0) { - LocalTableState state = new LocalTableState(env, localTable, d); - // get a consistent view of name - long now = d.getTimeStamp(); - if (now == HConstants.LATEST_TIMESTAMP) { - now = EnvironmentEdgeManager.currentTimeMillis(); - // update the delete's idea of 'now' to be consistent with the index - d.setTimestamp(now); - } - // get deletes from the codec - // we only need to get deletes and not add puts because this delete covers all columns - addDeleteUpdatesToMap(updateMap, state, now); - - /* - * Update the current state for all the kvs in the delete. Generally, we would just iterate - * the family map, but since we go here, the family map is empty! Therefore, we need to fake a - * bunch of family deletes (just like hos HRegion#prepareDelete works). This is just needed - * for current version of HBase that has an issue where the batch update doesn't update the - * deletes before calling the hook. - */ - byte[] deleteRow = d.getRow(); - for (byte[] family : this.env.getRegion().getTableDesc().getFamiliesKeys()) { - state.addPendingUpdates(new KeyValue(deleteRow, family, null, now, - KeyValue.Type.DeleteFamily)); - } - } else { - // Option 2: Its actually a bunch single updates, which can have different timestamps. - // Therefore, we need to do something similar to the put case and batch by timestamp - batchMutationAndAddUpdates(updateMap, d); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Found index updates for Delete: " + d + "\n" + updateMap); - } - - return updateMap.toMap(); - } - - @Override - public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows( - Collection<KeyValue> filtered) throws IOException { - // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows - return null; - } - - /** - * Exposed for testing! - * @param codec codec to use for this instance of the builder - */ - public void setIndexCodecForTesting(IndexCodec codec) { - this.codec = codec; - } - - @Override - public boolean isEnabled(Mutation m) throws IOException { - // ask the codec to see if we should even attempt indexing - return this.codec.isEnabled(m); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java index daa631b..e3ef831 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java @@ -1,19 +1,11 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. */ package org.apache.phoenix.hbase.index.covered; @@ -22,89 +14,93 @@ import java.io.IOException; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; - import org.apache.phoenix.index.BaseIndexCodec; - /** * Codec for creating index updates from the current state of a table. * <p> - * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as - * features need to be added to the codec, as well as potentially not haivng to implement some - * methods. + * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as features need to be + * added to the codec, as well as potentially not haivng to implement some methods. */ public interface IndexCodec { - /** - * Do any code initialization necessary - * @param env environment in which the codec is operating - * @throws IOException if the codec cannot be initalized correctly - */ - public void initialize(RegionCoprocessorEnvironment env) throws IOException; + /** + * Do any code initialization necessary + * + * @param env + * environment in which the codec is operating + * @throws IOException + * if the codec cannot be initalized correctly + */ + public void initialize(RegionCoprocessorEnvironment env) throws IOException; + + /** + * Get the index cleanup entries. Currently, this must return just single row deletes (where just the row-key is + * specified and no columns are returned) mapped to the table name. For instance, to you have an index 'myIndex' + * with row : + * + * <pre> + * v1,v2,v3 | CF:CQ0 | rowkey + * | CF:CQ1 | rowkey + * </pre> + * + * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'. + * + * @param state + * the current state of the table that needs to be cleaned up. Generally, you only care about the latest + * column values, for each column you are indexing for each index table. + * @return the pairs of (deletes, index table name) that should be applied. + * @throws IOException + */ + public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException; - /** - * Get the index cleanup entries. Currently, this must return just single row deletes (where just - * the row-key is specified and no columns are returned) mapped to the table name. For instance, - * to you have an index 'myIndex' with row : - * - * <pre> - * v1,v2,v3 | CF:CQ0 | rowkey - * | CF:CQ1 | rowkey - * </pre> - * - * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'. - * @param state the current state of the table that needs to be cleaned up. Generally, you only - * care about the latest column values, for each column you are indexing for each index - * table. - * @return the pairs of (deletes, index table name) that should be applied. - * @throws IOException - */ - public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException; + // table state has the pending update already applied, before calling + // get the new index entries + /** + * Get the index updates for the primary table state, for each index table. The returned {@link Put}s need to be + * fully specified (including timestamp) to minimize passes over the same key-values multiple times. + * <p> + * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so the index entries + * match the primary table row. This could be managed at a higher level, but would require iterating all the kvs in + * the Put again - very inefficient when compared to the current interface where you must provide a timestamp + * anyways (so you might as well provide the right one). + * + * @param state + * the current state of the table that needs to an index update Generally, you only care about the latest + * column values, for each column you are indexing for each index table. + * @return the pairs of (updates,index table name) that should be applied. + * @throws IOException + */ + public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException; - // table state has the pending update already applied, before calling - // get the new index entries - /** - * Get the index updates for the primary table state, for each index table. The returned - * {@link Put}s need to be fully specified (including timestamp) to minimize passes over the same - * key-values multiple times. - * <p> - * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so - * the index entries match the primary table row. This could be managed at a higher level, but - * would require iterating all the kvs in the Put again - very inefficient when compared to the - * current interface where you must provide a timestamp anyways (so you might as well provide the - * right one). - * @param state the current state of the table that needs to an index update Generally, you only - * care about the latest column values, for each column you are indexing for each index - * table. - * @return the pairs of (updates,index table name) that should be applied. - * @throws IOException - */ - public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException; + /** + * This allows the codec to dynamically change whether or not indexing should take place for a table. If it doesn't + * take place, we can save a lot of time on the regular Put patch. By making it dynamic, we can save offlining and + * then onlining a table just to turn indexing on. + * <p> + * We can also be smart about even indexing a given update here too - if the update doesn't contain any columns that + * we care about indexing, we can save the effort of analyzing the put and further. + * + * @param m + * mutation that should be indexed. + * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table basis, as each + * codec is instantiated per-region. + * @throws IOException + */ + public boolean isEnabled(Mutation m) throws IOException; - /** - * This allows the codec to dynamically change whether or not indexing should take place for a - * table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making - * it dynamic, we can save offlining and then onlining a table just to turn indexing on. - * <p> - * We can also be smart about even indexing a given update here too - if the update doesn't - * contain any columns that we care about indexing, we can save the effort of analyzing the put - * and further. - * @param m mutation that should be indexed. - * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table - * basis, as each codec is instantiated per-region. - * @throws IOException - */ - public boolean isEnabled(Mutation m) throws IOException; + /** + * Get the batch identifier of the given mutation. Generally, updates to the table will take place in a batch of + * updates; if we know that the mutation is part of a batch, we can build the state much more intelligently. + * <p> + * <b>If you have batches that have multiple updates to the same row state, you must specify a batch id for each + * batch. Otherwise, we cannot guarantee index correctness</b> + * + * @param m + * mutation that may or may not be part of the batch + * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch. + */ + public byte[] getBatchId(Mutation m); - /** - * Get the batch identifier of the given mutation. Generally, updates to the table will take place - * in a batch of updates; if we know that the mutation is part of a batch, we can build the state - * much more intelligently. - * <p> - * <b>If you have batches that have multiple updates to the same row state, you must specify a - * batch id for each batch. Otherwise, we cannot guarantee index correctness</b> - * @param m mutation that may or may not be part of the batch - * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch. - */ - public byte[] getBatchId(Mutation m); + public void setContext(TableState state, Mutation mutation) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index e4bc193..f47a71a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@ -1,19 +1,11 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. */ package org.apache.phoenix.hbase.index.covered; @@ -34,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.util.Pair; - +import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.data.IndexMemStore; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -42,203 +34,250 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.hbase.index.scanner.ScannerBuilder; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; + +import com.google.common.collect.Maps; /** * Manage the state of the HRegion's view of the table, for the single row. * <p> - * Currently, this is a single-use object - you need to create a new one for each row that you need - * to manage. In the future, we could make this object reusable, but for the moment its easier to - * manage as a throw-away object. + * Currently, this is a single-use object - you need to create a new one for each row that you need to manage. In the + * future, we could make this object reusable, but for the moment its easier to manage as a throw-away object. * <p> - * This class is <b>not</b> thread-safe - it requires external synchronization is access - * concurrently. + * This class is <b>not</b> thread-safe - it requires external synchronization is access concurrently. */ public class LocalTableState implements TableState { - private long ts; - private RegionCoprocessorEnvironment env; - private KeyValueStore memstore; - private LocalHBaseState table; - private Mutation update; - private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>(); - private ScannerBuilder scannerBuilder; - private List<KeyValue> kvs = new ArrayList<KeyValue>(); - private List<? extends IndexedColumnGroup> hints; - private CoveredColumns columnSet; - - public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) { - this.env = environment; - this.table = table; - this.update = update; - this.memstore = new IndexMemStore(); - this.scannerBuilder = new ScannerBuilder(memstore, update); - this.columnSet = new CoveredColumns(); - } - - public void addPendingUpdates(KeyValue... kvs) { - if (kvs == null) return; - addPendingUpdates(Arrays.asList(kvs)); - } - - public void addPendingUpdates(List<KeyValue> kvs) { - if(kvs == null) return; - setPendingUpdates(kvs); - addUpdate(kvs); - } - - private void addUpdate(List<KeyValue> list) { - addUpdate(list, true); - } - - private void addUpdate(List<KeyValue> list, boolean overwrite) { - if (list == null) return; - for (KeyValue kv : list) { - this.memstore.add(kv, overwrite); - } - } - - @Override - public RegionCoprocessorEnvironment getEnvironment() { - return this.env; - } - - @Override - public long getCurrentTimestamp() { - return this.ts; - } - - @Override - public void setCurrentTimestamp(long timestamp) { - this.ts = timestamp; - } - - public void resetTrackedColumns() { - this.trackedColumns.clear(); - } - - public Set<ColumnTracker> getTrackedColumns() { - return this.trackedColumns; - } - - @Override - public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState( - Collection<? extends ColumnReference> indexedColumns) throws IOException { - ensureLocalStateInitialized(indexedColumns); - // filter out things with a newer timestamp and track the column references to which it applies - ColumnTracker tracker = new ColumnTracker(indexedColumns); - synchronized (this.trackedColumns) { - // we haven't seen this set of columns before, so we need to create a new tracker - if (!this.trackedColumns.contains(tracker)) { - this.trackedColumns.add(tracker); - } - } - - Scanner scanner = - this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts); - - return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker)); - } - - /** - * Initialize the managed local state. Generally, this will only be called by - * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. - * Even then, there is still fairly low contention as each new Put/Delete will have its own table - * state. - */ - private synchronized void ensureLocalStateInitialized( - Collection<? extends ColumnReference> columns) throws IOException { - // check to see if we haven't initialized any columns yet - Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns); - // we have all the columns loaded, so we are good to go. - if (toCover.isEmpty()) { - return; - } - - // add the current state of the row - this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false); - - // add the covered columns to the set - for (ColumnReference ref : toCover) { - this.columnSet.addColumn(ref); - } - } - - @Override - public Map<String, byte[]> getUpdateAttributes() { - return this.update.getAttributesMap(); - } - - @Override - public byte[] getCurrentRowKey() { - return this.update.getRow(); - } - - public Result getCurrentRowState() { - KeyValueScanner scanner = this.memstore.getScanner(); - List<Cell> kvs = new ArrayList<Cell>(); - while (scanner.peek() != null) { - try { - kvs.add(scanner.next()); - } catch (IOException e) { - // this should never happen - something has gone terribly arwy if it has - throw new RuntimeException("Local MemStore threw IOException!"); - } - } - return Result.create(kvs); - } - - /** - * Helper to add a {@link Mutation} to the values stored for the current row - * @param pendingUpdate update to apply - */ - public void addUpdateForTesting(Mutation pendingUpdate) { - for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) { - List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue()); - addUpdate(edits); - } - } - - /** - * @param hints - */ - public void setHints(List<? extends IndexedColumnGroup> hints) { - this.hints = hints; - } - - @Override - public List<? extends IndexedColumnGroup> getIndexColumnHints() { - return this.hints; - } - - @Override - public Collection<KeyValue> getPendingUpdate() { - return this.kvs; - } - - /** - * Set the {@link KeyValue}s in the update for which we are currently building an index update, - * but don't actually apply them. - * @param update pending {@link KeyValue}s - */ - public void setPendingUpdates(Collection<KeyValue> update) { - this.kvs.clear(); - this.kvs.addAll(update); - } - - /** - * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}. - */ - public void applyPendingUpdates() { - this.addUpdate(kvs); - } - - /** - * Rollback all the given values from the underlying state. - * @param values - */ - public void rollback(Collection<KeyValue> values) { - for (KeyValue kv : values) { - this.memstore.rollback(kv); - } - } + private long ts; + private RegionCoprocessorEnvironment env; + private KeyValueStore memstore; + private LocalHBaseState table; + private Mutation update; + private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>(); + private ScannerBuilder scannerBuilder; + private List<Cell> kvs = new ArrayList<Cell>(); + private List<? extends IndexedColumnGroup> hints; + private CoveredColumns columnSet; + private final Map<String,Object> context = Maps.newHashMap(); + + public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) { + this.env = environment; + this.table = table; + this.update = update; + this.memstore = new IndexMemStore(); + this.scannerBuilder = new ScannerBuilder(memstore, update); + this.columnSet = new CoveredColumns(); + } + + public void addPendingUpdates(Cell... kvs) { + if (kvs == null) return; + addPendingUpdates(Arrays.asList(kvs)); + } + + public void addPendingUpdates(List<Cell> kvs) { + if (kvs == null) return; + setPendingUpdates(kvs); + addUpdate(kvs); + } + + private void addUpdate(List<Cell> list) { + addUpdate(list, true); + } + + private void addUpdate(List<Cell> list, boolean overwrite) { + if (list == null) return; + for (Cell kv : list) { + this.memstore.add(KeyValueUtil.ensureKeyValue(kv), overwrite); + } + } + + @Override + public RegionCoprocessorEnvironment getEnvironment() { + return this.env; + } + + @Override + public long getCurrentTimestamp() { + return this.ts; + } + + /** + * Set the current timestamp up to which the table should allow access to the underlying table. + * This overrides the timestamp view provided by the indexer - use with care! + * @param timestamp timestamp up to which the table should allow access. + */ + public void setCurrentTimestamp(long timestamp) { + this.ts = timestamp; + } + + public void resetTrackedColumns() { + this.trackedColumns.clear(); + } + + public Set<ColumnTracker> getTrackedColumns() { + return this.trackedColumns; + } + + /** + * Get a scanner on the columns that are needed by the index. + * <p> + * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given + * columns with a timestamp earlier than the timestamp to which the table is currently set (the + * current state of the table for which we need to build an update). + * <p> + * If none of the passed columns matches any of the columns in the pending update (as determined + * by {@link ColumnReference#matchesFamily(byte[])} and + * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This + * is because it doesn't make sense to build index updates when there is no change in the table + * state for any of the columns you are indexing. + * <p> + * <i>NOTE:</i> This method should <b>not</b> be used during + * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been + * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i> + * need to track the indexed columns. + * <p> + * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you + * request - you will never see a column with the timestamp we are tracking, but the next oldest + * timestamp for that column. + * @param indexedColumns the columns to that will be indexed + * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to + * the builder. Even if no update is necessary for the requested columns, you still need + * to return the {@link IndexUpdate}, just don't set the update for the + * {@link IndexUpdate}. + * @throws IOException + */ + public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState( + Collection<? extends ColumnReference> indexedColumns) throws IOException { + ensureLocalStateInitialized(indexedColumns); + // filter out things with a newer timestamp and track the column references to which it applies + ColumnTracker tracker = new ColumnTracker(indexedColumns); + synchronized (this.trackedColumns) { + // we haven't seen this set of columns before, so we need to create a new tracker + if (!this.trackedColumns.contains(tracker)) { + this.trackedColumns.add(tracker); + } + } + + Scanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts); + + return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker)); + } + + /** + * Initialize the managed local state. Generally, this will only be called by + * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even + * then, there is still fairly low contention as each new Put/Delete will have its own table state. + */ + private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns) + throws IOException { + // check to see if we haven't initialized any columns yet + Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns); + // we have all the columns loaded, so we are good to go. + if (toCover.isEmpty()) { return; } + + // add the current state of the row + this.addUpdate(this.table.getCurrentRowState(update, toCover).listCells(), false); + + // add the covered columns to the set + for (ColumnReference ref : toCover) { + this.columnSet.addColumn(ref); + } + } + + @Override + public Map<String, byte[]> getUpdateAttributes() { + return this.update.getAttributesMap(); + } + + @Override + public byte[] getCurrentRowKey() { + return this.update.getRow(); + } + + public Result getCurrentRowState() { + KeyValueScanner scanner = this.memstore.getScanner(); + List<Cell> kvs = new ArrayList<Cell>(); + while (scanner.peek() != null) { + try { + kvs.add(scanner.next()); + } catch (IOException e) { + // this should never happen - something has gone terribly arwy if it has + throw new RuntimeException("Local MemStore threw IOException!"); + } + } + return Result.create(kvs); + } + + /** + * Helper to add a {@link Mutation} to the values stored for the current row + * + * @param pendingUpdate + * update to apply + */ + public void addUpdateForTesting(Mutation pendingUpdate) { + for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) { + List<Cell> edits = e.getValue(); + addUpdate(edits); + } + } + + /** + * @param hints + */ + public void setHints(List<? extends IndexedColumnGroup> hints) { + this.hints = hints; + } + + @Override + public List<? extends IndexedColumnGroup> getIndexColumnHints() { + return this.hints; + } + + @Override + public Collection<Cell> getPendingUpdate() { + return this.kvs; + } + + /** + * Set the {@link KeyValue}s in the update for which we are currently building an index update, but don't actually + * apply them. + * + * @param update + * pending {@link KeyValue}s + */ + public void setPendingUpdates(Collection<Cell> update) { + this.kvs.clear(); + this.kvs.addAll(update); + } + + /** + * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}. + */ + public void applyPendingUpdates() { + this.addUpdate(kvs); + } + + /** + * Rollback all the given values from the underlying state. + * + * @param values + */ + public void rollback(Collection<Cell> values) { + for (Cell kv : values) { + this.memstore.rollback(KeyValueUtil.ensureKeyValue(kv)); + } + } + + @Override + public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns) + throws IOException { + Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns); + ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey()); + return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond()); + } + + @Override + public Map<String, Object> getContext() { + return context; + } } \ No newline at end of file
