Repository: phoenix Updated Branches: refs/heads/master c92ddc451 -> ce6b891fd
PHOENIX-4089 Prevent index from getting out of sync with data table under high concurrency Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ce6b891f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ce6b891f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ce6b891f Branch: refs/heads/master Commit: ce6b891fd658f6593845d1155509d0f8a599336f Parents: c92ddc4 Author: James Taylor <jamestay...@apache.org> Authored: Wed Aug 16 19:14:21 2017 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Thu Aug 17 15:46:09 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/ConcurrentMutationsIT.java | 56 +++++++++- .../phoenix/end2end/index/ImmutableIndexIT.java | 29 ++++-- .../end2end/index/PartialIndexRebuilderIT.java | 76 -------------- .../org/apache/phoenix/util/TestUtilIT.java | 89 ++++++++++++++++ .../apache/phoenix/compile/DeleteCompiler.java | 4 - .../phoenix/compile/PostIndexDDLCompiler.java | 9 +- .../apache/phoenix/compile/UpsertCompiler.java | 64 ++---------- .../apache/phoenix/execute/BaseQueryPlan.java | 8 +- .../apache/phoenix/execute/MutationState.java | 5 +- .../org/apache/phoenix/hbase/index/Indexer.java | 101 +++++++++++++++---- .../hbase/index/builder/BaseIndexBuilder.java | 5 + .../hbase/index/builder/IndexBuildManager.java | 4 + .../hbase/index/builder/IndexBuilder.java | 2 + .../phoenix/index/PhoenixIndexBuilder.java | 7 +- .../apache/phoenix/schema/MetaDataClient.java | 50 +++------ .../java/org/apache/phoenix/util/Repeat.java | 30 ++++++ .../apache/phoenix/util/RunUntilFailure.java | 73 ++++++++++++++ .../java/org/apache/phoenix/util/TestUtil.java | 4 +- 18 files changed, 389 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java index 9ed5174..d3e3761 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java @@ -50,14 +50,18 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.Repeat; +import org.apache.phoenix.util.RunUntilFailure; import org.apache.phoenix.util.TestUtil; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; import com.google.common.collect.Maps; +@RunWith(RunUntilFailure.class) public class ConcurrentMutationsIT extends BaseUniqueNamesOwnClusterIT { - private static final Random RAND = new Random(); + private static final Random RAND = new Random(5); private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_"; private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_"; private static final int ROW_LOCK_WAIT_TIME = 10000; @@ -225,6 +229,56 @@ public class ConcurrentMutationsIT extends BaseUniqueNamesOwnClusterIT { } @Test + @Repeat(25) + public void testConcurrentUpserts() throws Exception { + int nThreads = 8; + final int batchSize = 200; + final int nRows = 51; + final int nIndexValues = 23; + final String tableName = generateUniqueName(); + final String indexName = generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) STORE_NULLS=true, VERSIONS=1"); + addDelayingCoprocessor(conn, tableName); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)"); + final CountDownLatch doneSignal = new CountDownLatch(nThreads); + Runnable[] runnables = new Runnable[nThreads]; + for (int i = 0; i < nThreads; i++) { + runnables[i] = new Runnable() { + + @Override + public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + for (int i = 0; i < 10000; i++) { + boolean isNull = RAND.nextBoolean(); + int randInt = RAND.nextInt() % nIndexValues; + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, " + (isNull ? null : randInt) + ")"); + if ((i % batchSize) == 0) { + conn.commit(); + } + } + conn.commit(); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + } + for (int i = 0; i < nThreads; i++) { + Thread t = new Thread(runnables[i]); + t.start(); + } + + assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS)); + long actualRowCount = TestUtil.scrutinizeIndex(conn, tableName, indexName); + assertEquals(nRows, actualRowCount); + } + + @Test public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception { final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName(); final String indexName = generateUniqueName(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index 06802b6..bf38c78 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -39,7 +39,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; @@ -52,6 +51,7 @@ import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -71,7 +71,6 @@ import com.google.common.collect.Maps; public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { private final boolean localIndex; - private final boolean columnEncoded; private final String tableDDLOptions; private volatile boolean stopThreads = false; @@ -83,7 +82,6 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { public ImmutableIndexIT(boolean localIndex, boolean transactional, boolean columnEncoded) { StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true"); this.localIndex = localIndex; - this.columnEncoded = columnEncoded; if (!columnEncoded) { if (optionBuilder.length()!=0) optionBuilder.append(","); @@ -186,11 +184,12 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { String upsertSelect = "UPSERT INTO " + TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) " + "SELECT varchar_pk||'_upsert_select', char_pk, int_pk, long_pk, decimal_pk, date_pk FROM "+ TABLE_NAME; conn.createStatement().execute(upsertSelect); + TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE); ResultSet rs; rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME); assertTrue(rs.next()); assertEquals(440,rs.getInt(1)); - rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + TABLE_NAME); + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexName); assertTrue(rs.next()); assertEquals(440,rs.getInt(1)); } @@ -207,14 +206,22 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { if (tableName.equalsIgnoreCase(TABLE_NAME) // create the index after the second batch && Bytes.startsWith(put.getRow(), Bytes.toBytes("varchar200_upsert_select"))) { - try { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.createStatement().execute(INDEX_DDL); + Runnable r = new Runnable() { + + @Override + public void run() { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + // Run CREATE INDEX call in separate thread as otherwise we block + // this thread (not a realistic scenario) and prevent our catchup + // query from adding the missing rows. + conn.createStatement().execute(INDEX_DDL); + } catch (SQLException e) { + } } - } catch (SQLException e) { - throw new DoNotRetryIOException(e); - } + + }; + new Thread(r).start(); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 1e54228..8c43e3f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -55,82 +55,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS); } - - @Test - public void testRowCountIndexScrutiny() throws Throwable { - String schemaName = generateUniqueName(); - String tableName = generateUniqueName(); - String indexName = generateUniqueName(); - String fullTableName = SchemaUtil.getTableName(schemaName, tableName); - String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); - conn.commit(); - - int count = conn.createStatement().executeUpdate("DELETE FROM " + fullIndexName + " WHERE \":K\"='a' AND \"0:V\"='ccc'"); - assertEquals(1,count); - conn.commit(); - try { - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); - fail(); - } catch (AssertionError e) { - assertEquals(e.getMessage(),"Expected data table row count to match expected:<1> but was:<2>"); - } - } - } - @Test - public void testExtraRowIndexScrutiny() throws Throwable { - String schemaName = generateUniqueName(); - String tableName = generateUniqueName(); - String indexName = generateUniqueName(); - String fullTableName = SchemaUtil.getTableName(schemaName, tableName); - String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')"); - conn.commit(); - - conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('bbb','x','0')"); - conn.commit(); - try { - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); - fail(); - } catch (AssertionError e) { - assertEquals(e.getMessage(),"Expected to find PK in data table: ('x')"); - } - } - } - - @Test - public void testValueIndexScrutiny() throws Throwable { - String schemaName = generateUniqueName(); - String tableName = generateUniqueName(); - String indexName = generateUniqueName(); - String fullTableName = SchemaUtil.getTableName(schemaName, tableName); - String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')"); - conn.commit(); - - conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('ccc','a','2')"); - conn.commit(); - try { - TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); - fail(); - } catch (AssertionError e) { - assertEquals(e.getMessage(),"Expected equality for V2, but '2'!='1'"); - } - } - } - @Test public void testMultiVersionsAfterFailure() throws Throwable { String schemaName = generateUniqueName(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java new file mode 100644 index 0000000..d26d9f6 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java @@ -0,0 +1,89 @@ +package org.apache.phoenix.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; + +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.junit.Test; + +public class TestUtilIT extends ParallelStatsDisabledIT { + @Test + public void testRowCountIndexScrutiny() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); + conn.commit(); + + int count = conn.createStatement().executeUpdate("DELETE FROM " + fullIndexName + " WHERE \":K\"='a' AND \"0:V\"='ccc'"); + assertEquals(1,count); + conn.commit(); + try { + TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); + fail(); + } catch (AssertionError e) { + assertEquals(e.getMessage(),"Expected data table row count to match expected:<2> but was:<1>"); + } + } + } + @Test + public void testExtraRowIndexScrutiny() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')"); + conn.commit(); + + conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('bbb','x','0')"); + conn.commit(); + try { + TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); + fail(); + } catch (AssertionError e) { + assertEquals(e.getMessage(),"Expected to find PK in data table: ('x')"); + } + } + } + + @Test + public void testValueIndexScrutiny() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')"); + conn.commit(); + + conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('ccc','a','2')"); + conn.commit(); + try { + TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName); + fail(); + } catch (AssertionError e) { + assertEquals(e.getMessage(),"Expected equality for V2, but '2'!='1'"); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index de8b2ce..b2fd17c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -566,10 +566,6 @@ public class DeleteCompiler { } else if (runOnServer) { // TODO: better abstraction Scan scan = context.getScan(); - // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be - // future dated data row mutations that will get in the way of generating the - // correct index rows on replay. - scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE); // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where> http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java index 1a667ae..b3cedf6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java @@ -53,15 +53,8 @@ public class PostIndexDDLCompiler { public MutationPlan compile(final PTable indexTable) throws SQLException { /* - * Handles: - * 1) Populate a newly created table with contents. - * 2) Activate the index by setting the INDEX_STATE to + * Compiles an UPSERT SELECT command to read from the data table and populate the index table */ - // NOTE: For first version, we would use a upsert/select to populate the new index table and - // returns synchronously. Creating an index on an existing table with large amount of data - // will as a result take a very very long time. - // In the long term, we should change this to an asynchronous process to populate the index - // that would allow the user to easily monitor the process of index creation. StringBuilder indexColumns = new StringBuilder(); StringBuilder dataColumns = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index b99727b..0d09e9d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -24,7 +24,6 @@ import java.sql.ParameterMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; -import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; import java.util.Collections; @@ -81,8 +80,6 @@ import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.ConstraintViolationException; import org.apache.phoenix.schema.DelegateColumn; import org.apache.phoenix.schema.IllegalDataException; -import org.apache.phoenix.schema.MetaDataClient; -import org.apache.phoenix.schema.MetaDataEntityNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; import org.apache.phoenix.schema.PName; @@ -109,7 +106,6 @@ import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; -import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -345,10 +341,6 @@ public class UpsertCompiler { services.getProps().getBoolean(QueryServices.ENABLE_SERVER_UPSERT_SELECT, QueryServicesOptions.DEFAULT_ENABLE_SERVER_UPSERT_SELECT); UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null; - // Retry once if auto commit is off, as the meta data may - // be out of date. We do not retry if auto commit is on, as we - // update the cache up front when we create the resolver in that case. - boolean retryOnce = !connection.getAutoCommit(); boolean useServerTimestampToBe = false; @@ -512,7 +504,6 @@ public class UpsertCompiler { } sameTable = !select.isJoin() && tableRefToBe.equals(selectResolver.getTables().get(0)); - tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables()); /* We can run the upsert in a coprocessor if: * 1) from has only 1 table or server UPSERT SELECT is enabled * 2) the select query isn't doing aggregation (which requires a client-side final merge) @@ -550,17 +541,12 @@ public class UpsertCompiler { select = SelectStatement.create(select, hint); // Pass scan through if same table in upsert and select so that projection is computed correctly // Use optimizer to choose the best plan - try { - QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false); - queryPlanToBe = compiler.compile(); - // This is post-fix: if the tableRef is a projected table, this means there are post-processing - // steps and parallelIteratorFactory did not take effect. - if (queryPlanToBe.getTableRef().getTable().getType() == PTableType.PROJECTED || queryPlanToBe.getTableRef().getTable().getType() == PTableType.SUBQUERY) { - parallelIteratorFactoryToBe = null; - } - } catch (MetaDataEntityNotFoundException e) { - retryOnce = false; // don't retry if select clause has meta data entities that aren't found, as we already updated the cache - throw e; + QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false); + queryPlanToBe = compiler.compile(); + // This is post-fix: if the tableRef is a projected table, this means there are post-processing + // steps and parallelIteratorFactory did not take effect. + if (queryPlanToBe.getTableRef().getTable().getType() == PTableType.PROJECTED || queryPlanToBe.getTableRef().getTable().getType() == PTableType.SUBQUERY) { + parallelIteratorFactoryToBe = null; } nValuesToSet = queryPlanToBe.getProjector().getColumnCount(); // Cannot auto commit if doing aggregation or topN or salted @@ -699,10 +685,6 @@ public class UpsertCompiler { */ final StatementContext context = queryPlan.getContext(); final Scan scan = context.getScan(); - // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be - // future dated data row mutations that will get in the way of generating the - // correct index rows on replay. - scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE, UngroupedAggregateRegionObserver.serialize(projectedTable)); scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions)); @@ -752,10 +734,6 @@ public class UpsertCompiler { Tuple row = iterator.next(); final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr); - for (PTable index : getNewIndexes(table)) { - new MetaDataClient(connection).buildIndex(index, tableRef, - scan.getTimeRange().getMax(), scan.getTimeRange().getMax() + 1); - } return new MutationState(maxSize, maxSizeBytes, connection) { @Override public long getUpdateCount() { @@ -768,18 +746,6 @@ public class UpsertCompiler { } - private List<PTable> getNewIndexes(PTable table) throws SQLException { - List<PTable> indexes = table.getIndexes(); - List<PTable> newIndexes = new ArrayList<PTable>(2); - PTable newTable = PhoenixRuntime.getTableNoCache(connection, table.getName().getString()); - for (PTable index : newTable.getIndexes()) { - if (!indexes.contains(index)) { - newIndexes.add(index); - } - } - return newIndexes; - } - @Override public ExplainPlan getExplainPlan() throws SQLException { List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps(); @@ -1133,24 +1099,6 @@ public class UpsertCompiler { return false; } - private TableRef adjustTimestampToMinOfSameTable(TableRef upsertRef, List<TableRef> selectRefs) { - long minTimestamp = Long.MAX_VALUE; - for (TableRef selectRef : selectRefs) { - if (selectRef.equals(upsertRef)) { - minTimestamp = Math.min(minTimestamp, selectRef.getTimeStamp()); - } - } - if (minTimestamp != Long.MAX_VALUE) { - // If we found the same table is selected from that is being upserted to, - // reset the timestamp of the upsert (which controls the Put timestamp) - // to the lowest timestamp we found to ensure that the data being selected - // will not see the data being upserted. This prevents infinite loops - // like the one in PHOENIX-1257. - return new TableRef(upsertRef, minTimestamp); - } - return upsertRef; - } - private static class UpdateColumnCompiler extends ExpressionCompiler { private PColumn column; http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 881a033..e6e7b97 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -279,12 +279,8 @@ public abstract class BaseQueryPlan implements QueryPlan { TimeRange scanTimeRange = scan.getTimeRange(); Long scn = connection.getSCN(); if (scn == null) { - // If we haven't resolved the time at the beginning of compilation, don't - // force the lookup on the server, but use HConstants.LATEST_TIMESTAMP instead. - scn = tableRef.getTimeStamp(); - if (scn == QueryConstants.UNSET_TIMESTAMP) { - scn = HConstants.LATEST_TIMESTAMP; - } + // Always use latest timestamp unless scn is set or transactional (see PHOENIX-4089) + scn = HConstants.LATEST_TIMESTAMP; } try { TimeRange timeRangeToUse = ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 024ee4f..1be0482 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -695,7 +695,6 @@ public class MutationState implements SQLCloseable { private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException { Long scn = connection.getSCN(); MetaDataClient client = new MetaDataClient(connection); - long serverTimeStamp = tableRef.getTimeStamp(); // If we're auto committing, we've already validated the schema when we got the ColumnResolver, // so no need to do it again here. PTable table = tableRef.getTable(); @@ -719,7 +718,6 @@ public class MutationState implements SQLCloseable { } long timestamp = result.getMutationTime(); if (timestamp != QueryConstants.UNSET_TIMESTAMP) { - serverTimeStamp = timestamp; if (result.wasUpdated()) { List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size()); for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) { @@ -741,7 +739,7 @@ public class MutationState implements SQLCloseable { } } } - return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn; + return scn == null ? HConstants.LATEST_TIMESTAMP : scn; } private static long calculateMutationSize(List<Mutation> mutations) { @@ -910,7 +908,6 @@ public class MutationState implements SQLCloseable { try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { Span span = trace.getSpan(); ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); - boolean isTransactional; while (tableRefIterator.hasNext()) { // at this point we are going through mutations for each table final TableRef tableRef = tableRefIterator.next(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 8523977..be475b7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Increment; @@ -85,9 +87,11 @@ import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ServerUtil; +import com.google.common.collect.Lists; import com.google.common.collect.Multimap; /** @@ -356,12 +360,25 @@ public class Indexer extends BaseRegionObserver { private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS); private static final OperationStatus FAILURE = new OperationStatus(OperationStatusCode.FAILURE, "Unable to acquire row lock"); + + // Assume time stamp of mutation a client defined time stamp if it's not within + // a factor of ten of the current time. + // TODO: get rid of this and have client pass LATEST_TIMESTAMP unless an SCN is set + private static boolean isProbablyClientControlledTimeStamp(Mutation m) { + double ratio = EnvironmentEdgeManager.currentTimeMillis() / MetaDataUtil.getClientTimeStamp(m); + return ratio > 10 || ratio < 0.10; + } + + private static void setTimeStamp(KeyValue kv, byte[] tsBytes) { + int tsOffset = kv.getTimestampOffset(); + System.arraycopy(tsBytes, 0, kv.getBuffer(), tsOffset, Bytes.SIZEOF_LONG); + } public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable { // first group all the updates for a single row into a single update to be processed - Map<ImmutableBytesPtr, MultiMutation> mutations = + Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<ImmutableBytesPtr, MultiMutation>(); Durability defaultDurability = Durability.SYNC_WAL; @@ -370,17 +387,18 @@ public class Indexer extends BaseRegionObserver { defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ? Durability.SYNC_WAL : defaultDurability; } + /* + * Exclusively lock all rows so we get a consistent read + * while determining the index updates + */ Durability durability = Durability.SKIP_WAL; + boolean copyMutations = false; for (int i = 0; i < miniBatchOp.size(); i++) { Mutation m = miniBatchOp.getOperation(i); if (this.builder.isAtomicOp(m)) { miniBatchOp.setOperationStatus(i, IGNORE); continue; } - // skip this mutation if we aren't enabling indexing - // unfortunately, we really should ask if the raw mutation (rather than the combined mutation) - // should be indexed, which means we need to expose another method on the builder. Such is the - // way optimization go though. if (this.builder.isEnabled(m)) { boolean success = false; try { @@ -412,26 +430,73 @@ public class Indexer extends BaseRegionObserver { if (effectiveDurablity.ordinal() > durability.ordinal()) { durability = effectiveDurablity; } - - // TODO: remove this code as Phoenix prevents any duplicate - // rows in the batch mutation from the client side (PHOENIX-4054). - // Add the mutation to the batch set + // Track whether or not we need to ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); - MultiMutation stored = mutations.get(row); - // we haven't seen this row before, so add it - if (stored == null) { - stored = new MultiMutation(row); - mutations.put(row, stored); + if (mutationsMap.containsKey(row)) { + copyMutations = true; + } else { + mutationsMap.put(row, null); } - stored.addAll(m); } } - + // early exit if it turns out we don't have any edits - if (mutations.isEmpty()) { + if (mutationsMap.isEmpty()) { return; } + // If we're copying the mutations + Collection<Mutation> originalMutations; + Collection<? extends Mutation> mutations; + if (copyMutations) { + originalMutations = null; + mutations = mutationsMap.values(); + } else { + originalMutations = Lists.newArrayListWithExpectedSize(mutationsMap.size()); + mutations = originalMutations; + } + + Mutation firstMutation = miniBatchOp.getOperation(0); + boolean resetTimeStamp = !this.builder.isPartialRebuild(firstMutation) && !isProbablyClientControlledTimeStamp(firstMutation); + long now = EnvironmentEdgeManager.currentTimeMillis(); + byte[] byteNow = Bytes.toBytes(now); + for (int i = 0; i < miniBatchOp.size(); i++) { + Mutation m = miniBatchOp.getOperation(i); + // skip this mutation if we aren't enabling indexing + // unfortunately, we really should ask if the raw mutation (rather than the combined mutation) + // should be indexed, which means we need to expose another method on the builder. Such is the + // way optimization go though. + if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) { + if (resetTimeStamp) { + // Unless we're replaying edits to rebuild the index, we update the time stamp + // of the data table to prevent overlapping time stamps (which prevents index + // inconsistencies as this case isn't handled correctly currently). + for (List<Cell> family : m.getFamilyCellMap().values()) { + List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family); + for (KeyValue kv : familyKVs) { + setTimeStamp(kv, byteNow); + } + } + } + + // Only copy mutations if we found duplicate rows + // (which is pretty much never for Phoenix) + if (copyMutations) { + // Add the mutation to the batch set + ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); + MultiMutation stored = mutationsMap.get(row); + // we haven't seen this row before, so add it + if (stored == null) { + stored = new MultiMutation(row); + mutationsMap.put(row, stored); + } + stored.addAll(m); + } else { + originalMutations.add(m); + } + } + } + // dump all the index updates into a single WAL. They will get combined in the end anyways, so // don't worry which one we get WALEdit edit = miniBatchOp.getWalEdit(0); @@ -451,7 +516,7 @@ public class Indexer extends BaseRegionObserver { // get the index updates for all elements in this batch Collection<Pair<Mutation, byte[]>> indexUpdates = - this.builder.getIndexUpdate(miniBatchOp, mutations.values()); + this.builder.getIndexUpdate(miniBatchOp, mutations); long duration = EnvironmentEdgeManager.currentTimeMillis() - start; http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java index b9174b8..21350d4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java @@ -129,4 +129,9 @@ public abstract class BaseIndexBuilder implements IndexBuilder { public boolean isStopped() { return this.stopped; } + + @Override + public boolean isPartialRebuild(Mutation m) { + return false; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index 0567d35..f8fb421 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -132,4 +132,8 @@ public class IndexBuildManager implements Stoppable { public IndexBuilder getBuilderForTesting() { return this.delegate; } + + public boolean isPartialRebuild(Mutation m) throws IOException { + return this.delegate.isPartialRebuild(m); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java index dff205a..e64a857 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java @@ -148,4 +148,6 @@ public interface IndexBuilder extends Stoppable { * or null if Increment does not represent an ON DUPLICATE KEY clause. */ public List<Mutation> executeAtomicOp(Increment inc) throws IOException; + + public boolean isPartialRebuild(Mutation m); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index acb139e..109dfef 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -381,4 +381,9 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder { public static boolean isDupKeyIgnore(byte[] onDupKeyBytes) { return onDupKeyBytes != null && Bytes.compareTo(ON_DUP_KEY_IGNORE_BYTES, onDupKeyBytes) == 0; } -} + + @Override + public boolean isPartialRebuild(Mutation m) { + return PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 69cd814..1499010 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1244,43 +1244,6 @@ public class MetaDataClient { throw new IllegalStateException(); // impossible } - /** - * For new mutations only should not be used if there are deletes done in the data table between start time and end - * time passed to the method. - */ - public MutationState buildIndex(PTable index, TableRef dataTableRef, long startTime, long EndTime) - throws SQLException { - boolean wasAutoCommit = connection.getAutoCommit(); - try { - AlterIndexStatement indexStatement = FACTORY - .alterIndex( - FACTORY.namedTable(null, - TableName.create(index.getSchemaName().getString(), - index.getTableName().getString())), - dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE); - alterIndex(indexStatement); - connection.setAutoCommit(true); - MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, dataTableRef); - Scan scan = mutationPlan.getContext().getScan(); - try { - scan.setTimeRange(startTime, EndTime); - } catch (IOException e) { - throw new SQLException(e); - } - MutationState state = connection.getQueryServices().updateData(mutationPlan); - indexStatement = FACTORY - .alterIndex( - FACTORY.namedTable(null, - TableName.create(index.getSchemaName().getString(), - index.getTableName().getString())), - dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE); - alterIndex(indexStatement); - return state; - } finally { - connection.setAutoCommit(wasAutoCommit); - } - } - private MutationPlan getMutationPlanForBuildingIndex(PTable index, TableRef dataTableRef) throws SQLException { MutationPlan mutationPlan; if (index.getIndexType() == IndexType.LOCAL) { @@ -1320,7 +1283,12 @@ public class MetaDataClient { // for global indexes on non transactional tables we might have to // run a second index population upsert select to handle data rows - // that were being written on the server while the index was created + // that were being written on the server while the index was created. + // TODO: this sleep time is really arbitrary. If any query is in progress + // while the index is being built, we're depending on this sleep + // waiting them out. Instead we should have a means of waiting until + // all in progress queries are complete (though I'm not sure that's + // feasible). See PHOENIX-4092. long sleepTime = connection .getQueryServices() @@ -1342,6 +1310,12 @@ public class MetaDataClient { // was created long minTimestamp = index.getTimeStamp() - firstUpsertSelectTime; try { + // TODO: Use scn or LATEST_TIMESTAMP here? It's possible that a DML statement + // ran and ended up with timestamps later than this time. If we use a later + // timestamp, we'll need to run the partial index rebuilder here as it's + // possible that the updates to the table were made (such as deletes) after + // the scn which would not be properly reflected correctly this mechanism. + // See PHOENIX-4092. mutationPlan.getContext().getScan().setTimeRange(minTimestamp, scn); } catch (IOException e) { throw new SQLException(e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/test/java/org/apache/phoenix/util/Repeat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/Repeat.java b/phoenix-core/src/test/java/org/apache/phoenix/util/Repeat.java new file mode 100644 index 0000000..7c7c013 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/Repeat.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD}) +public @interface Repeat { + int value(); +} + http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/test/java/org/apache/phoenix/util/RunUntilFailure.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/RunUntilFailure.java b/phoenix-core/src/test/java/org/apache/phoenix/util/RunUntilFailure.java new file mode 100644 index 0000000..2b378c1 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/RunUntilFailure.java @@ -0,0 +1,73 @@ +package org.apache.phoenix.util; + +import org.junit.Ignore; +import org.junit.runner.Description; +import org.junit.runner.notification.Failure; +import org.junit.runner.notification.RunListener; +import org.junit.runner.notification.RunNotifier; +import org.junit.runners.BlockJUnit4ClassRunner; +import org.junit.runners.model.FrameworkMethod; +import org.junit.runners.model.InitializationError; +import org.junit.runners.model.Statement; + +public class RunUntilFailure extends BlockJUnit4ClassRunner { + private boolean hasFailure; + + public RunUntilFailure(Class<?> klass) throws InitializationError { + super(klass); + } + + @Override + protected Description describeChild(FrameworkMethod method) { + if (method.getAnnotation(Repeat.class) != null && + method.getAnnotation(Ignore.class) == null) { + return describeRepeatTest(method); + } + return super.describeChild(method); + } + + private Description describeRepeatTest(FrameworkMethod method) { + int times = method.getAnnotation(Repeat.class).value(); + + Description description = Description.createSuiteDescription( + testName(method) + " [" + times + " times]", + method.getAnnotations()); + + for (int i = 1; i <= times; i++) { + description.addChild(Description.createTestDescription( + getTestClass().getJavaClass(), + testName(method) + "-" + i)); + } + return description; + } + + @Override + protected void runChild(final FrameworkMethod method, RunNotifier notifier) { + Description description = describeChild(method); + + if (method.getAnnotation(Repeat.class) != null && + method.getAnnotation(Ignore.class) == null) { + runRepeatedly(methodBlock(method), description, notifier); + } + super.runChild(method, notifier); + } + + private void runRepeatedly(Statement statement, Description description, + RunNotifier notifier) { + notifier.addListener(new RunListener() { + @Override + public void testFailure(Failure failure) { + hasFailure = true; + } + }); + for (Description desc : description.getChildren()) { + if (hasFailure) { + notifier.fireTestIgnored(desc); + } else if(!desc.isSuite()) { + runLeaf(statement, desc, notifier); + } + } + } + +} + http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce6b891f/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 12b1c03..af924f1 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -983,8 +983,8 @@ public class TestUtil { } long dcount = getRowCount(conn, fullTableName); - assertEquals("Expected data table row count to match", icount, dcount); - return icount; + assertEquals("Expected data table row count to match", dcount, icount); + return dcount; } private static long getRowCount(Connection conn, String tableName) throws SQLException {