PHOENIX-4095 Prevent index from getting out of sync with data table during partial rebuild
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/649b737a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/649b737a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/649b737a Branch: refs/heads/master Commit: 649b737a81243adc43b508a90addc9a2962c6bc1 Parents: ce6b891 Author: James Taylor <jamestay...@apache.org> Authored: Thu Aug 17 15:47:11 2017 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Thu Aug 17 16:56:32 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/ConcurrentMutationsIT.java | 3 +- .../phoenix/end2end/OutOfOrderMutationsIT.java | 19 +- .../end2end/index/PartialIndexRebuilderIT.java | 197 +++++++++-- .../apache/phoenix/util/IndexScrutinyIT.java | 89 +++++ .../org/apache/phoenix/util/TestUtilIT.java | 89 ----- .../UngroupedAggregateRegionObserver.java | 5 + .../org/apache/phoenix/hbase/index/Indexer.java | 139 ++++---- .../hbase/index/covered/NonTxIndexBuilder.java | 93 +---- .../covered/example/CoveredColumnIndexer.java | 5 +- .../hbase/index/util/IndexManagementUtil.java | 94 +++++ .../hbase/index/covered/CoveredColumnsTest.java | 46 +++ .../index/covered/LocalTableStateTest.java | 293 ++++++++++++++++ .../index/covered/NonTxIndexBuilderTest.java | 341 +++++++++++++++++++ .../hbase/index/covered/TestCoveredColumns.java | 46 --- .../index/covered/TestLocalTableState.java | 293 ---------------- .../index/covered/TestNonTxIndexBuilder.java | 335 ------------------ .../org/apache/phoenix/util/IndexScrutiny.java | 144 ++++++++ .../java/org/apache/phoenix/util/TestUtil.java | 109 +----- 18 files changed, 1273 insertions(+), 1067 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java index d3e3761..e674d8f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java @@ -47,6 +47,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.IndexScrutiny; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -274,7 +275,7 @@ public class ConcurrentMutationsIT extends BaseUniqueNamesOwnClusterIT { } assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS)); - long actualRowCount = TestUtil.scrutinizeIndex(conn, tableName, indexName); + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); assertEquals(nRows, actualRowCount); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java index 5cdc1ee..e8adf6b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.util.IndexScrutiny; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; @@ -91,7 +92,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); conn = DriverManager.getConnection(getUrl(), props); - TestUtil.scrutinizeIndex(conn, tableName, indexName); + IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); assertNoTimeStampAt(conn, indexName, 1030); conn.close(); @@ -175,7 +176,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); conn = DriverManager.getConnection(getUrl(), props); - TestUtil.scrutinizeIndex(conn, tableName, indexName); + IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName); assertTrue(rs.next()); @@ -265,7 +266,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - TestUtil.scrutinizeIndex(conn, tableName, indexName); + IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName); assertTrue(rs.next()); @@ -331,7 +332,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - TestUtil.scrutinizeIndex(conn, tableName, indexName); + IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName); assertTrue(rs.next()); @@ -393,7 +394,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - TestUtil.scrutinizeIndex(conn, tableName, indexName); + IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName); assertTrue(rs.next()); @@ -458,7 +459,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - TestUtil.scrutinizeIndex(conn, tableName, indexName); + IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName); assertTrue(rs.next()); @@ -523,7 +524,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - TestUtil.scrutinizeIndex(conn, tableName, indexName); + IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName); assertTrue(rs.next()); @@ -587,7 +588,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - long rowCount = TestUtil.scrutinizeIndex(conn, tableName, indexName); + long rowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); assertEquals(0,rowCount); conn.close(); @@ -640,7 +641,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - long rowCount = TestUtil.scrutinizeIndex(conn, tableName, indexName); + long rowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); assertEquals(0,rowCount); conn.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 8c43e3f..bc0dda8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -18,44 +18,203 @@ package org.apache.phoenix.end2end.index; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.SQLException; import java.util.Map; import java.util.Properties; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PMetaData; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexScrutiny; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.Repeat; +import org.apache.phoenix.util.RunUntilFailure; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; import com.google.common.collect.Maps; +@RunWith(RunUntilFailure.class) public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { + private static final Random RAND = new Random(5); + @BeforeClass public static void doSetup() throws Exception { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString()); serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000"); serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "30000"); // give up rebuilding after 30 seconds - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(1000)); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(2000)); setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS); } + private static boolean mutateRandomly(Connection conn, String fullTableName, int nRows) throws Exception { + return mutateRandomly(conn, fullTableName, nRows, false); + } + + private static boolean hasInactiveIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException { + PTable table = metaCache.getTableRef(key).getTable(); + for (PTable index : table.getIndexes()) { + if (index.getIndexState() == PIndexState.INACTIVE) { + return true; + } + } + return false; + } + + private static boolean isAllActiveIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException { + PTable table = metaCache.getTableRef(key).getTable(); + for (PTable index : table.getIndexes()) { + if (index.getIndexState() != PIndexState.ACTIVE) { + return false; + } + } + return true; + } + + private static boolean mutateRandomly(Connection conn, String fullTableName, int nRows, boolean checkForInactive) throws SQLException, InterruptedException { + PTableKey key = new PTableKey(null,fullTableName); + PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); + boolean hasInactiveIndex = false; + int batchSize = checkForInactive && !isAllActiveIndex(metaCache, key) ? 1 : 200; + for (int i = 0; i < 10000; i++) { + int pk = Math.abs(RAND.nextInt()) % nRows; + int v1 = Math.abs(RAND.nextInt()) % nRows; + int v2 = Math.abs(RAND.nextInt()) % nRows; + if (checkForInactive && hasInactiveIndex(metaCache, key)) { + checkForInactive = false; + hasInactiveIndex = true; + batchSize = 200; + } + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES(" + pk + "," + v1 + "," + v2 + ")"); + if (i % batchSize == 0) { + conn.commit(); + if (checkForInactive) Thread.sleep(100); + } + } + conn.commit(); + for (int i = 0; i < 10000; i++) { + int pk = Math.abs(RAND.nextInt()) % nRows; + if (checkForInactive && hasInactiveIndex(metaCache, key)) { + checkForInactive = false; + hasInactiveIndex = true; + batchSize = 200; + } + conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k= " + pk); + if (i % batchSize == 0) { + conn.commit(); + } + } + conn.commit(); + for (int i = 0; i < 10000; i++) { + int pk = Math.abs(RAND.nextInt()) % nRows; + int v1 = Math.abs(RAND.nextInt()) % nRows; + int v2 = Math.abs(RAND.nextInt()) % nRows; + if (checkForInactive && hasInactiveIndex(metaCache, key)) { + checkForInactive = false; + hasInactiveIndex = true; + batchSize = 200; + } + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES(" + pk + "," + v1 + "," + v2 + ")"); + if (i % batchSize == 0) { + conn.commit(); + } + } + conn.commit(); + return hasInactiveIndex; + } + + @Test + @Repeat(20) + public void testDeleteAndUpsertAfterFailure() throws Throwable { + final int nRows = 10; + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k INTEGER PRIMARY KEY, v1 INTEGER, v2 INTEGER) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); + mutateRandomly(conn, fullTableName, nRows); + long disableTS = EnvironmentEdgeManager.currentTimeMillis(); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); + mutateRandomly(conn, fullTableName, nRows); + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + assertEquals(nRows,actualRowCount); + } + } + @Test + public void testWriteWhileRebuilding() throws Throwable { + final int nRows = 10; + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k INTEGER PRIMARY KEY, v1 INTEGER, v2 INTEGER) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); + mutateRandomly(conn, fullTableName, nRows); + long disableTS = EnvironmentEdgeManager.currentTimeMillis(); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); + mutateRandomly(conn, fullTableName, nRows); + final boolean[] hasInactiveIndex = new boolean[1]; + final CountDownLatch doneSignal = new CountDownLatch(1); + Runnable r = new Runnable() { + + @Override + public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + hasInactiveIndex[0] = mutateRandomly(conn, fullTableName, nRows, true); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Thread t = new Thread(r); + t.setDaemon(true); + t.start(); + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + doneSignal.await(120, TimeUnit.SECONDS); + assertTrue(hasInactiveIndex[0]); + + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + assertEquals(nRows,actualRowCount); + } + } + + @Test public void testMultiVersionsAfterFailure() throws Throwable { String schemaName = generateUniqueName(); String tableName = generateUniqueName(); @@ -77,10 +236,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee')"); conn.commit(); TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } } @@ -106,10 +263,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); conn.commit(); TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } } @@ -135,10 +290,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); conn.commit(); TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } } @@ -162,10 +315,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("DELETE FROM " + fullTableName); conn.commit(); TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } } @@ -189,10 +340,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')"); conn.commit(); TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } } @@ -220,10 +369,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn2.commit(); } TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } } @@ -251,10 +398,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn2.commit(); } TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } } @@ -282,10 +427,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn2.commit(); } TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); - TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java new file mode 100644 index 0000000..a703294 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java @@ -0,0 +1,89 @@ +package org.apache.phoenix.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; + +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.junit.Test; + +public class IndexScrutinyIT extends ParallelStatsDisabledIT { + @Test + public void testRowCountIndexScrutiny() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); + conn.commit(); + + int count = conn.createStatement().executeUpdate("DELETE FROM " + fullIndexName + " WHERE \":K\"='a' AND \"0:V\"='ccc'"); + assertEquals(1,count); + conn.commit(); + try { + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + fail(); + } catch (AssertionError e) { + assertEquals(e.getMessage(),"Expected data table row count to match expected:<2> but was:<1>"); + } + } + } + @Test + public void testExtraRowIndexScrutiny() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')"); + conn.commit(); + + conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('bbb','x','0')"); + conn.commit(); + try { + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + fail(); + } catch (AssertionError e) { + assertEquals(e.getMessage(),"Expected to find PK in data table: ('x')"); + } + } + } + + @Test + public void testValueIndexScrutiny() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')"); + conn.commit(); + + conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('ccc','a','2')"); + conn.commit(); + try { + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + fail(); + } catch (AssertionError e) { + assertEquals(e.getMessage(),"Expected equality for V2, but '2'!='1'"); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java deleted file mode 100644 index d26d9f6..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java +++ /dev/null @@ -1,89 +0,0 @@ -package org.apache.phoenix.util; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.sql.Connection; -import java.sql.DriverManager; - -import org.apache.phoenix.end2end.ParallelStatsDisabledIT; -import org.junit.Test; - -public class TestUtilIT extends ParallelStatsDisabledIT { - @Test - public void testRowCountIndexScrutiny() throws Throwable { - String schemaName = generateUniqueName(); - String tableName = generateUniqueName(); - String indexName = generateUniqueName(); - String fullTableName = SchemaUtil.getTableName(schemaName, tableName); - String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); - conn.commit(); - - int count = conn.createStatement().executeUpdate("DELETE FROM " + fullIndexName + " WHERE \":K\"='a' AND \"0:V\"='ccc'"); - assertEquals(1,count); - conn.commit(); - try { - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); - fail(); - } catch (AssertionError e) { - assertEquals(e.getMessage(),"Expected data table row count to match expected:<2> but was:<1>"); - } - } - } - @Test - public void testExtraRowIndexScrutiny() throws Throwable { - String schemaName = generateUniqueName(); - String tableName = generateUniqueName(); - String indexName = generateUniqueName(); - String fullTableName = SchemaUtil.getTableName(schemaName, tableName); - String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')"); - conn.commit(); - - conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('bbb','x','0')"); - conn.commit(); - try { - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); - fail(); - } catch (AssertionError e) { - assertEquals(e.getMessage(),"Expected to find PK in data table: ('x')"); - } - } - } - - @Test - public void testValueIndexScrutiny() throws Throwable { - String schemaName = generateUniqueName(); - String tableName = generateUniqueName(); - String indexName = generateUniqueName(); - String fullTableName = SchemaUtil.getTableName(schemaName, tableName); - String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')"); - conn.commit(); - - conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('ccc','a','2')"); - conn.commit(); - try { - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); - fail(); - } catch (AssertionError e) { - assertEquals(e.getMessage(),"Expected equality for V2, but '2'!='1'"); - } - } - } - - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index a07b5d0..298f9e9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -897,6 +898,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); mutations.add(put); + // Since we're replaying existing mutations, it makes no sense to write them to the wal + put.setDurability(Durability.SKIP_WAL); } put.add(cell); } else { @@ -907,6 +910,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); mutations.add(del); + // Since we're replaying existing mutations, it makes no sense to write them to the wal + del.setDurability(Durability.SKIP_WAL); } del.addDeleteMarker(cell); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index be475b7..492a367 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -123,6 +123,11 @@ public class Indexer extends BaseRegionObserver { protected IndexBuildManager builder; private LockManager lockManager; + // Hack to get around not being able to save any state between + // coprocessor calls. TODO: remove after HBASE-18127 when available + private ThreadLocal<Collection<Pair<Mutation, byte[]>>> indexUpdates = + new ThreadLocal<Collection<Pair<Mutation, byte[]>>>(); + /** Configuration key for the {@link IndexBuilder} to use */ public static final String INDEX_BUILDER_CONF_KEY = "index.builder"; @@ -480,9 +485,12 @@ public class Indexer extends BaseRegionObserver { } // Only copy mutations if we found duplicate rows - // (which is pretty much never for Phoenix) + // which only occurs when we're partially rebuilding + // the index (since we'll potentially have both a + // Put and a Delete mutation for the same row). if (copyMutations) { // Add the mutation to the batch set + ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); MultiMutation stored = mutationsMap.get(row); // we haven't seen this row before, so add it @@ -505,6 +513,9 @@ public class Indexer extends BaseRegionObserver { miniBatchOp.setWalEdit(0, edit); } + if (copyMutations) { + mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations); + } // get the current span, or just use a null-span to avoid a bunch of if statements try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { @@ -543,42 +554,29 @@ public class Indexer extends BaseRegionObserver { miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()])); } - - // write them, either to WAL or the index tables - doPre(indexUpdates, edit, durability); + if (!indexUpdates.isEmpty()) { + setIndexUpdates(c, indexUpdates); + // write index updates to WAL + if (durability != Durability.SKIP_WAL) { + // we have all the WAL durability, so we just update the WAL entry and move on + for (Pair<Mutation, byte[]> entry : indexUpdates) { + edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst())); + } + } + } } } - /** - * Add the index updates to the WAL, or write to the index table, if the WAL has been disabled - * @return <tt>true</tt> if the WAL has been updated. - * @throws IOException - */ - private boolean doPre(Collection<Pair<Mutation, byte[]>> indexUpdates, final WALEdit edit, - final Durability durability) throws IOException { - // no index updates, so we are done - if (indexUpdates == null || indexUpdates.size() == 0) { - return false; - } - - // if writing to wal is disabled, we never see the WALEdit updates down the way, so do the index - // update right away - if (durability == Durability.SKIP_WAL) { - try { - this.writer.write(indexUpdates, false); - return false; - } catch (Throwable e) { - LOG.error("Failed to update index with entries:" + indexUpdates, e); - IndexManagementUtil.rethrowIndexingException(e); - } - } - - // we have all the WAL durability, so we just update the WAL entry and move on - for (Pair<Mutation, byte[]> entry : indexUpdates) { - edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst())); - } - - return true; + private void setIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c, Collection<Pair<Mutation, byte[]>> indexUpdates) { + this.indexUpdates.set(indexUpdates); + } + + private Collection<Pair<Mutation, byte[]>> getIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c) { + return this.indexUpdates.get(); + } + + private void removeIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c) { + this.indexUpdates.remove(); } @Override @@ -601,8 +599,7 @@ public class Indexer extends BaseRegionObserver { if (success) { // if miniBatchOp was successfully written, write index updates //each batch operation, only the first one will have anything useful, so we can just grab that Mutation mutation = miniBatchOp.getOperation(0); - WALEdit edit = miniBatchOp.getWalEdit(0); - doPost(edit, mutation, mutation.getDurability()); + doPost(c, mutation); } } finally { long duration = EnvironmentEdgeManager.currentTimeMillis() - start; @@ -616,22 +613,21 @@ public class Indexer extends BaseRegionObserver { } } - private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException { - try { - doPostWithExceptions(edit, m, durability); - return; - } catch (Throwable e) { - rethrowIndexingException(e); + private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, Mutation m) throws IOException { + try { + doPostWithExceptions(c,m); + return; + } catch (Throwable e) { + rethrowIndexingException(e); + } + throw new RuntimeException( + "Somehow didn't complete the index update, but didn't return succesfully either!"); } - throw new RuntimeException( - "Somehow didn't complete the index update, but didn't return succesfully either!"); - } - private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability) - throws Exception { + private void doPostWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, Mutation m) + throws IOException { //short circuit, if we don't need to do any work - if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || edit == null) { - // already did the index update in prePut, so we are done + if (!this.builder.isEnabled(m)) { return; } @@ -643,42 +639,21 @@ public class Indexer extends BaseRegionObserver { } long start = EnvironmentEdgeManager.currentTimeMillis(); - // there is a little bit of excess here- we iterate all the non-indexed kvs for this check first - // and then do it again later when getting out the index updates. This should be pretty minor - // though, compared to the rest of the runtime - IndexedKeyValue ikv = getFirstIndexedKeyValue(edit); - - /* - * early exit - we have nothing to write, so we don't need to do anything else. NOTE: we don't - * release the WAL Rolling lock (INDEX_UPDATE_LOCK) since we never take it in doPre if there are - * no index updates. - */ - if (ikv == null) { + + Collection<Pair<Mutation, byte[]>> indexUpdates = getIndexUpdates(c); + if (indexUpdates == null) { return; } - /* - * only write the update if we haven't already seen this batch. We only want to write the batch - * once (this hook gets called with the same WALEdit for each Put/Delete in a batch, which can - * lead to writing all the index updates for each Put/Delete). - */ - if (!ikv.getBatchFinished()) { - Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit); - - // the WAL edit is kept in memory and we already specified the factory when we created the - // references originally - therefore, we just pass in a null factory here and use the ones - // already specified on each reference - try { - current.addTimelineAnnotation("Actually doing index update for first time"); - writer.writeAndKillYourselfOnFailure(indexUpdates, false); - } finally { // With a custom kill policy, we may throw instead of kill the server. - // Without doing this in a finally block (at least with the mini cluster), - // the region server never goes down. - - // mark the batch as having been written. In the single-update case, this never gets check - // again, but in the batch case, we will check it again (see above). - ikv.markBatchFinished(); - } + // the WAL edit is kept in memory and we already specified the factory when we created the + // references originally - therefore, we just pass in a null factory here and use the ones + // already specified on each reference + try { + current.addTimelineAnnotation("Actually doing index update for first time"); + writer.writeAndKillYourselfOnFailure(indexUpdates, false); + } finally { // With a custom kill policy, we may throw instead of kill the server. + // mark the batch as having been written so it won't be written again + removeIndexUpdates(c); } long duration = EnvironmentEdgeManager.currentTimeMillis() - start; http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java index 8d2bd83..c013b5b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@ -12,24 +12,17 @@ package org.apache.phoenix.hbase.index.covered; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; @@ -39,7 +32,6 @@ import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; import com.google.common.collect.Lists; -import com.google.common.primitives.Longs; /** * Build covered indexes for phoenix updates. @@ -94,76 +86,27 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { * @throws IOException */ private void batchMutationAndAddUpdates(IndexUpdateManager manager, LocalTableState state, Mutation m, IndexMetaData indexMetaData) throws IOException { - // split the mutation into timestamp-based batches - Collection<Batch> batches = createTimestampBatchesFromMutation(m); - - // go through each batch of keyvalues and build separate index entries for each - boolean cleanupCurrentState = !indexMetaData.isImmutableRows(); - for (Batch batch : batches) { - /* - * We have to split the work between the cleanup and the update for each group because when we update the - * current state of the row for the current batch (appending the mutations for the current batch) the next - * group will see that as the current state, which will can cause the a delete and a put to be created for - * the next group. - */ - if (addMutationsForBatch(manager, batch, state, cleanupCurrentState, indexMetaData)) { - cleanupCurrentState = false; - } - } - } - - /** - * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any {@link KeyValue} with a timestamp - * == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called. - * - * @param m - * {@link Mutation} from which to extract the {@link KeyValue}s - * @return the mutation, broken into batches and sorted in ascending order (smallest first) - */ - protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) { - Map<Long, Batch> batches = new HashMap<Long, Batch>(); + // The cells of a mutation are broken up into time stamp batches prior to this call (in Indexer). + long ts = m.getFamilyCellMap().values().iterator().next().iterator().next().getTimestamp(); + Batch batch = new Batch(ts); for (List<Cell> family : m.getFamilyCellMap().values()) { - List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family); - createTimestampBatchesFromKeyValues(familyKVs, batches); - } - // sort the batches - List<Batch> sorted = new ArrayList<Batch>(batches.values()); - Collections.sort(sorted, new Comparator<Batch>() { - @Override - public int compare(Batch o1, Batch o2) { - return Longs.compare(o1.getTimestamp(), o2.getTimestamp()); + List<KeyValue> kvs = KeyValueUtil.ensureKeyValues(family); + for (KeyValue kv : kvs) { + batch.add(kv); + assert(ts == kv.getTimestamp()); } - }); - return sorted; - } - - /** - * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any {@link KeyValue} with a - * timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called. - * - * @param kvs - * {@link KeyValue}s to break into batches - * @param batches - * to update with the given kvs - */ - protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs, Map<Long, Batch> batches) { - long now = EnvironmentEdgeManager.currentTime(); - byte[] nowBytes = Bytes.toBytes(now); + } - // batch kvs by timestamp - for (KeyValue kv : kvs) { - long ts = kv.getTimestamp(); - // override the timestamp to the current time, so the index and primary tables match - // all the keys with LATEST_TIMESTAMP will then be put into the same batch - if (kv.updateLatestStamp(nowBytes)) { - ts = now; - } - Batch batch = batches.get(ts); - if (batch == null) { - batch = new Batch(ts); - batches.put(ts, batch); - } - batch.add(kv); + // go through each batch of keyvalues and build separate index entries for each + boolean cleanupCurrentState = !indexMetaData.isImmutableRows(); + /* + * We have to split the work between the cleanup and the update for each group because when we update the + * current state of the row for the current batch (appending the mutations for the current batch) the next + * group will see that as the current state, which will can cause the a delete and a put to be created for + * the next group. + */ + if (addMutationsForBatch(manager, batch, state, cleanupCurrentState, indexMetaData)) { + cleanupCurrentState = false; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java index c830e54..925bcbb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java @@ -40,6 +40,7 @@ import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.LocalTableState; import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; /** * Index maintainer that maintains multiple indexes based on '{@link ColumnGroup}s'. Each group is a @@ -141,7 +142,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder { } // do the usual thing as for deletes - Collection<Batch> timeBatch = createTimestampBatchesFromMutation(p); + Collection<Batch> timeBatch = IndexManagementUtil.createTimestampBatchesFromMutation(p); LocalTableState state = new LocalTableState(env, localTable, p); for (Batch entry : timeBatch) { //just set the timestamp on the table - it already has all the future state @@ -158,7 +159,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder { */ private Collection<Batch> batchByRow(Collection<KeyValue> filtered) { Map<Long, Batch> batches = new HashMap<Long, Batch>(); - createTimestampBatchesFromKeyValues(filtered, batches); + IndexManagementUtil.createTimestampBatchesFromKeyValues(filtered, batches); return batches.values(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java index 6582c8a..697caef 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java @@ -18,22 +18,39 @@ package org.apache.phoenix.hbase.index.util; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException; +import org.apache.phoenix.hbase.index.covered.Batch; import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; + /** * Utility class to help manage indexes */ @@ -189,4 +206,81 @@ public class IndexManagementUtil { conf.setInt(key, value); } } + + /** + * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any {@link KeyValue} with a + * timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called. + * + * @param kvs {@link KeyValue}s to break into batches + * @param batches to update with the given kvs + */ + public static void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs, Map<Long, Batch> batches) { + // batch kvs by timestamp + for (KeyValue kv : kvs) { + long ts = kv.getTimestamp(); + Batch batch = batches.get(ts); + if (batch == null) { + batch = new Batch(ts); + batches.put(ts, batch); + } + batch.add(kv); + } + } + + /** + * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any {@link KeyValue} with a timestamp + * == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called. + * + * @param m {@link Mutation} from which to extract the {@link KeyValue}s + * @return the mutation, broken into batches and sorted in ascending order (smallest first) + */ + public static Collection<Batch> createTimestampBatchesFromMutation(Mutation m) { + Map<Long, Batch> batches = new HashMap<Long, Batch>(); + for (List<Cell> family : m.getFamilyCellMap().values()) { + List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family); + createTimestampBatchesFromKeyValues(familyKVs, batches); + } + // sort the batches + List<Batch> sorted = new ArrayList<Batch>(batches.values()); + Collections.sort(sorted, new Comparator<Batch>() { + @Override + public int compare(Batch o1, Batch o2) { + return Longs.compare(o1.getTimestamp(), o2.getTimestamp()); + } + }); + return sorted; + } + + public static Collection<? extends Mutation> flattenMutationsByTimestamp(Collection<? extends Mutation> mutations) { + List<Mutation> flattenedMutations = Lists.newArrayListWithExpectedSize(mutations.size() * 10); + for (Mutation m : mutations) { + byte[] row = m.getRow(); + Collection<Batch> batches = createTimestampBatchesFromMutation(m); + for (Batch batch : batches) { + Mutation mWithSameTS; + Cell firstCell = batch.getKvs().get(0); + if (KeyValue.Type.codeToType(firstCell.getTypeByte()) == KeyValue.Type.Put) { + mWithSameTS = new Put(row); + } else { + mWithSameTS = new Delete(row); + } + if (m.getAttributesMap() != null) { + for (Map.Entry<String,byte[]> entry : m.getAttributesMap().entrySet()) { + mWithSameTS.setAttribute(entry.getKey(), entry.getValue()); + } + } + for (Cell cell : batch.getKvs()) { + byte[] fam = CellUtil.cloneFamily(cell); + List<Cell> famCells = mWithSameTS.getFamilyCellMap().get(fam); + if (famCells == null) { + famCells = Lists.newArrayList(); + mWithSameTS.getFamilyCellMap().put(fam, famCells); + } + famCells.add(cell); + } + flattenedMutations.add(mWithSameTS); + } + } + return flattenedMutations; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsTest.java new file mode 100644 index 0000000..db7d838 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; + +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +import org.apache.phoenix.hbase.index.covered.CoveredColumns; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; + +public class CoveredColumnsTest { + + private static final byte[] fam = Bytes.toBytes("fam"); + private static final byte[] qual = Bytes.toBytes("qual"); + + @Test + public void testCovering() { + ColumnReference ref = new ColumnReference(fam, qual); + CoveredColumns columns = new CoveredColumns(); + assertEquals("Should have only found a single column to cover", 1, columns + .findNonCoveredColumns(Arrays.asList(ref)).size()); + + columns.addColumn(ref); + assertEquals("Shouldn't have any columns to cover", 0, + columns.findNonCoveredColumns(Arrays.asList(ref)).size()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/649b737a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java new file mode 100644 index 0000000..dcf330f --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; +import org.apache.phoenix.hbase.index.covered.data.LocalTable; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.scanner.Scanner; +import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * + */ +public class LocalTableStateTest { + + private static final byte[] row = Bytes.toBytes("row"); + private static final byte[] fam = Bytes.toBytes("fam"); + private static final byte[] qual = Bytes.toBytes("qual"); + private static final byte[] val = Bytes.toBytes("val"); + private static final long ts = 10; + private static final IndexMetaData indexMetaData = new IndexMetaData() { + + @Override + public boolean isImmutableRows() { + return false; + } + + @Override + public boolean ignoreNewerMutations() { + return false; + } + + }; + + @SuppressWarnings("unchecked") + @Test + public void testCorrectOrderingWithLazyLoadingColumns() throws Exception { + Put m = new Put(row); + m.add(fam, qual, ts, val); + // setup mocks + Configuration conf = new Configuration(false); + RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class); + Mockito.when(env.getConfiguration()).thenReturn(conf); + + Region region = Mockito.mock(Region.class); + Mockito.when(env.getRegion()).thenReturn(region); + RegionScanner scanner = Mockito.mock(RegionScanner.class); + Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner); + final byte[] stored = Bytes.toBytes("stored-value"); + Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0]; + KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored); + kv.setSequenceId(0); + list.add(kv); + return false; + } + }); + + + LocalHBaseState state = new LocalTable(env); + LocalTableState table = new LocalTableState(env, state, m); + //add the kvs from the mutation + table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); + + // setup the lookup + ColumnReference col = new ColumnReference(fam, qual); + table.setCurrentTimestamp(ts); + //check that our value still shows up first on scan, even though this is a lazy load + Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + Scanner s = p.getFirst(); + assertEquals("Didn't get the pending mutation's value first", m.get(fam, qual).get(0), s.next()); + } + + public static final class ScannerCreatedException extends RuntimeException { + ScannerCreatedException(String msg) { + super(msg); + } + } + + @Test(expected = ScannerCreatedException.class) + public void testScannerForMutableRows() throws Exception { + IndexMetaData indexMetaData = new IndexMetaData() { + + @Override + public boolean isImmutableRows() { + return false; + } + + @Override + public boolean ignoreNewerMutations() { + return false; + } + + }; + Put m = new Put(row); + m.add(fam, qual, ts, val); + // setup mocks + Configuration conf = new Configuration(false); + RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class); + Mockito.when(env.getConfiguration()).thenReturn(conf); + + Region region = Mockito.mock(Region.class); + Mockito.when(env.getRegion()).thenReturn(region); + Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable")); + + LocalHBaseState state = new LocalTable(env); + LocalTableState table = new LocalTableState(env, state, m); + //add the kvs from the mutation + table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); + + // setup the lookup + ColumnReference col = new ColumnReference(fam, qual); + table.setCurrentTimestamp(ts); + table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + } + + @Test + public void testNoScannerForImmutableRows() throws Exception { + IndexMetaData indexMetaData = new IndexMetaData() { + + @Override + public boolean isImmutableRows() { + return true; + } + + @Override + public boolean ignoreNewerMutations() { + return false; + } + + }; + Put m = new Put(row); + m.add(fam, qual, ts, val); + // setup mocks + Configuration conf = new Configuration(false); + RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class); + Mockito.when(env.getConfiguration()).thenReturn(conf); + + Region region = Mockito.mock(Region.class); + Mockito.when(env.getRegion()).thenReturn(region); + Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable")); + + LocalHBaseState state = new LocalTable(env); + LocalTableState table = new LocalTableState(env, state, m); + //add the kvs from the mutation + table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); + + // setup the lookup + ColumnReference col = new ColumnReference(fam, qual); + table.setCurrentTimestamp(ts); + //check that our value still shows up first on scan, even though this is a lazy load + Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + Scanner s = p.getFirst(); + assertEquals("Didn't get the pending mutation's value first", m.get(fam, qual).get(0), s.next()); + } + + /** + * Test that we correctly rollback the state of keyvalue + * @throws Exception + */ + @Test + @SuppressWarnings("unchecked") + public void testCorrectRollback() throws Exception { + Put m = new Put(row); + m.add(fam, qual, ts, val); + // setup mocks + RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class); + + Region region = Mockito.mock(Region.class); + Mockito.when(env.getRegion()).thenReturn(region); + RegionScanner scanner = Mockito.mock(RegionScanner.class); + Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner); + final byte[] stored = Bytes.toBytes("stored-value"); + final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, stored); + storedKv.setSequenceId(2); + Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0]; + + list.add(storedKv); + return false; + } + }); + LocalHBaseState state = new LocalTable(env); + LocalTableState table = new LocalTableState(env, state, m); + // add the kvs from the mutation + KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0)); + kv.setSequenceId(0); + table.addPendingUpdates(kv); + + // setup the lookup + ColumnReference col = new ColumnReference(fam, qual); + table.setCurrentTimestamp(ts); + // check that the value is there + Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + Scanner s = p.getFirst(); + assertEquals("Didn't get the pending mutation's value first", kv, s.next()); + + // rollback that value + table.rollback(Arrays.asList(kv)); + p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + s = p.getFirst(); + assertEquals("Didn't correctly rollback the row - still found it!", null, s.next()); + Mockito.verify(env, Mockito.times(1)).getRegion(); + Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void testOnlyLoadsRequestedColumns() throws Exception { + // setup mocks + RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class); + + Region region = Mockito.mock(Region.class); + Mockito.when(env.getRegion()).thenReturn(region); + RegionScanner scanner = Mockito.mock(RegionScanner.class); + Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner); + final KeyValue storedKv = + new KeyValue(row, fam, qual, ts, Type.Put, Bytes.toBytes("stored-value")); + storedKv.setSequenceId(2); + Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0]; + + list.add(storedKv); + return false; + } + }); + LocalHBaseState state = new LocalTable(env); + Put pendingUpdate = new Put(row); + pendingUpdate.add(fam, qual, ts, val); + LocalTableState table = new LocalTableState(env, state, pendingUpdate); + + // do the lookup for the given column + ColumnReference col = new ColumnReference(fam, qual); + table.setCurrentTimestamp(ts); + // check that the value is there + Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + Scanner s = p.getFirst(); + // make sure it read the table the one time + assertEquals("Didn't get the stored keyvalue!", storedKv, s.next()); + + // on the second lookup it shouldn't access the underlying table again - the cached columns + // should know they are done + p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + s = p.getFirst(); + assertEquals("Lost already loaded update!", storedKv, s.next()); + Mockito.verify(env, Mockito.times(1)).getRegion(); + Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class)); + } + + // TODO add test here for making sure multiple column references with the same column family don't + // cause an infinite loop +}