Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.0 1b85f63e6 -> e43d0a2c1
PHOENIX-2621 ImmutableIndexIT is failing Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e43d0a2c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e43d0a2c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e43d0a2c Branch: refs/heads/4.x-HBase-1.0 Commit: e43d0a2c1cc3f04938c989b55c7eb4b599760acc Parents: 1b85f63 Author: James Taylor <jtay...@salesforce.com> Authored: Fri Jan 22 00:58:46 2016 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Fri Jan 22 01:01:08 2016 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/index/ImmutableIndexIT.java | 70 ++++++++++---------- 1 file changed, 36 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e43d0a2c/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index c18e4ab..ced3ac8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -72,20 +72,19 @@ import com.google.common.collect.Maps; public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { private final boolean localIndex; - private final boolean transactional; private final String tableDDLOptions; private final String tableName; private final String indexName; private final String fullTableName; private final String fullIndexName; + private volatile boolean stopThreads = false; private static String TABLE_NAME; private static String INDEX_DDL; - public static final AtomicInteger NUM_ROWS = new AtomicInteger(1); + public static final AtomicInteger NUM_ROWS = new AtomicInteger(0); public ImmutableIndexIT(boolean localIndex, boolean transactional) { this.localIndex = localIndex; - this.transactional = transactional; StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true"); if (transactional) { optionBuilder.append(", TRANSACTIONAL=true"); @@ -210,8 +209,8 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { } } - private static class UpsertRunnable implements Runnable { - private static final int NUM_ROWS_IN_BATCH = 1000; + private class UpsertRunnable implements Runnable { + private static final int NUM_ROWS_IN_BATCH = 10; private final String fullTableName; public UpsertRunnable(String fullTableName) { @@ -222,18 +221,19 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { public void run() { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - while (true) { + while (!stopThreads) { // write a large batch of rows boolean fistRowInBatch = true; - for (int i=0; i<NUM_ROWS_IN_BATCH; ++i) { - BaseTest.upsertRow(conn, fullTableName, NUM_ROWS.intValue(), fistRowInBatch); - NUM_ROWS.incrementAndGet(); + for (int i=0; i<NUM_ROWS_IN_BATCH && !stopThreads; ++i) { + BaseTest.upsertRow(conn, fullTableName, NUM_ROWS.incrementAndGet(), fistRowInBatch); fistRowInBatch = false; } conn.commit(); + Thread.sleep(10); } } catch (SQLException e) { throw new RuntimeException(e); + } catch (InterruptedException e) { } } } @@ -242,56 +242,58 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { public void testCreateIndexWhileUpsertingData() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions; - String indexDDL = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IF NOT EXISTS " + indexName + " ON " + fullTableName + String indexDDL = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + " (long_pk, varchar_pk)" + " INCLUDE (long_col1, long_col2)"; - int numThreads = 3; + int numThreads = 2; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setPriority(Thread.MIN_PRIORITY); + return t; + } + }); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.setAutoCommit(false); + conn.setAutoCommit(true); Statement stmt = conn.createStatement(); stmt.execute(ddl); - ExecutorService executorService = Executors.newFixedThreadPool(numThreads, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = Executors.defaultThreadFactory().newThread(r); - t.setDaemon(true); - return t; - } - }); + ResultSet rs; + rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + fullTableName); + assertTrue(rs.next()); + int dataTableRowCount = rs.getInt(1); + assertEquals(0,dataTableRowCount); + List<Future<?>> futureList = Lists.newArrayListWithExpectedSize(numThreads); for (int i =0; i<numThreads; ++i) { futureList.add(executorService.submit(new UpsertRunnable(fullTableName))); } // upsert some rows before creating the index - Thread.sleep(500); + Thread.sleep(100); // create the index try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) { - conn2.setAutoCommit(false); - Statement stmt2 = conn2.createStatement(); - stmt2.execute(indexDDL); - conn2.commit(); + conn2.createStatement().execute(indexDDL); } // upsert some rows after creating the index - Thread.sleep(100); + Thread.sleep(50); // cancel the running threads - for (Future<?> future : futureList) { - future.cancel(true); - } - executorService.shutdownNow(); - executorService.awaitTermination(30, TimeUnit.SECONDS); - Thread.sleep(100); + stopThreads = true; + executorService.shutdown(); + assertTrue(executorService.awaitTermination(30, TimeUnit.SECONDS)); - ResultSet rs; rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + fullTableName); assertTrue(rs.next()); - int dataTableRowCount = rs.getInt(1); + dataTableRowCount = rs.getInt(1); rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName); assertTrue(rs.next()); int indexTableRowCount = rs.getInt(1); assertEquals("Data and Index table should have the same number of rows ", dataTableRowCount, indexTableRowCount); + } finally { + executorService.shutdownNow(); } }