Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 a7af86303 -> 1e777b2b8


PHOENIX-4089 Prevent index from getting out of sync with data table under high 
concurrency


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8261009f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8261009f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8261009f

Branch: refs/heads/4.x-HBase-0.98
Commit: 8261009fad2b944b42bf135b4926407cf2f66cb7
Parents: a7af863
Author: James Taylor <jamestay...@apache.org>
Authored: Wed Aug 16 19:14:21 2017 -0700
Committer: James Taylor <jamestay...@apache.org>
Committed: Thu Aug 17 17:10:22 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/ConcurrentMutationsIT.java  |  56 +++++++++-
 .../phoenix/end2end/index/ImmutableIndexIT.java |  29 ++++--
 .../end2end/index/PartialIndexRebuilderIT.java  |  76 --------------
 .../org/apache/phoenix/util/TestUtilIT.java     |  89 ++++++++++++++++
 .../apache/phoenix/compile/DeleteCompiler.java  |   4 -
 .../phoenix/compile/PostIndexDDLCompiler.java   |   9 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |  64 ++----------
 .../apache/phoenix/execute/BaseQueryPlan.java   |   8 +-
 .../apache/phoenix/execute/MutationState.java   |   5 +-
 .../org/apache/phoenix/hbase/index/Indexer.java | 103 +++++++++++++++----
 .../hbase/index/builder/BaseIndexBuilder.java   |   5 +
 .../hbase/index/builder/IndexBuildManager.java  |   4 +
 .../hbase/index/builder/IndexBuilder.java       |   2 +
 .../phoenix/index/PhoenixIndexBuilder.java      |   7 +-
 .../apache/phoenix/schema/MetaDataClient.java   |  50 +++------
 .../java/org/apache/phoenix/util/Repeat.java    |  30 ++++++
 .../apache/phoenix/util/RunUntilFailure.java    |  73 +++++++++++++
 .../java/org/apache/phoenix/util/TestUtil.java  |   4 +-
 18 files changed, 390 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
index 9ed5174..d3e3761 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -50,14 +50,18 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.Repeat;
+import org.apache.phoenix.util.RunUntilFailure;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import com.google.common.collect.Maps;
 
+@RunWith(RunUntilFailure.class)
 public class ConcurrentMutationsIT extends BaseUniqueNamesOwnClusterIT {
-    private static final Random RAND = new Random();
+    private static final Random RAND = new Random(5);
     private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_"; 
 
     private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_";
     private static final int ROW_LOCK_WAIT_TIME = 10000;
@@ -225,6 +229,56 @@ public class ConcurrentMutationsIT extends 
BaseUniqueNamesOwnClusterIT {
     }
     
     @Test
+    @Repeat(25)
+    public void testConcurrentUpserts() throws Exception {
+        int nThreads = 8;
+        final int batchSize = 200;
+        final int nRows = 51;
+        final int nIndexValues = 23;
+        final String tableName = generateUniqueName();
+        final String indexName = generateUniqueName();
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 
INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY 
(k1,k2)) STORE_NULLS=true, VERSIONS=1");
+        addDelayingCoprocessor(conn, tableName);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(v1)");
+        final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+        Runnable[] runnables = new Runnable[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+           runnables[i] = new Runnable() {
+    
+               @Override
+               public void run() {
+                   try {
+                       Connection conn = DriverManager.getConnection(getUrl());
+                       for (int i = 0; i < 10000; i++) {
+                           boolean isNull = RAND.nextBoolean();
+                           int randInt = RAND.nextInt() % nIndexValues;
+                           conn.createStatement().execute("UPSERT INTO " + 
tableName + " VALUES (" + (i % nRows) + ", 0, " + (isNull ? null : randInt) + 
")");
+                           if ((i % batchSize) == 0) {
+                               conn.commit();
+                           }
+                       }
+                       conn.commit();
+                   } catch (SQLException e) {
+                       throw new RuntimeException(e);
+                   } finally {
+                       doneSignal.countDown();
+                   }
+               }
+                
+            };
+        }
+        for (int i = 0; i < nThreads; i++) {
+            Thread t = new Thread(runnables[i]);
+            t.start();
+        }
+        
+        assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
+        long actualRowCount = TestUtil.scrutinizeIndex(conn, tableName, 
indexName);
+        assertEquals(nRows, actualRowCount);
+    }
+
+    @Test
     public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception {
         final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName();
         final String indexName = generateUniqueName();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 06802b6..bf38c78 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -39,7 +39,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
@@ -52,6 +51,7 @@ import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -71,7 +71,6 @@ import com.google.common.collect.Maps;
 public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
 
     private final boolean localIndex;
-    private final boolean columnEncoded;
     private final String tableDDLOptions;
 
     private volatile boolean stopThreads = false;
@@ -83,7 +82,6 @@ public class ImmutableIndexIT extends 
BaseUniqueNamesOwnClusterIT {
     public ImmutableIndexIT(boolean localIndex, boolean transactional, boolean 
columnEncoded) {
         StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
         this.localIndex = localIndex;
-        this.columnEncoded = columnEncoded;
         if (!columnEncoded) {
             if (optionBuilder.length()!=0)
                 optionBuilder.append(",");
@@ -186,11 +184,12 @@ public class ImmutableIndexIT extends 
BaseUniqueNamesOwnClusterIT {
             String upsertSelect = "UPSERT INTO " + TABLE_NAME + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk) " +
                     "SELECT varchar_pk||'_upsert_select', char_pk, int_pk, 
long_pk, decimal_pk, date_pk FROM "+ TABLE_NAME;
             conn.createStatement().execute(upsertSelect);
+            TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
             ResultSet rs;
             rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ 
COUNT(*) FROM " + TABLE_NAME);
             assertTrue(rs.next());
             assertEquals(440,rs.getInt(1));
-            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + 
TABLE_NAME);
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + 
indexName);
             assertTrue(rs.next());
             assertEquals(440,rs.getInt(1));
         }
@@ -207,14 +206,22 @@ public class ImmutableIndexIT extends 
BaseUniqueNamesOwnClusterIT {
             if (tableName.equalsIgnoreCase(TABLE_NAME)
                     // create the index after the second batch  
                     && Bytes.startsWith(put.getRow(), 
Bytes.toBytes("varchar200_upsert_select"))) {
-                try {
-                    Properties props = 
PropertiesUtil.deepCopy(TEST_PROPERTIES);
-                    try (Connection conn = 
DriverManager.getConnection(getUrl(), props)) {
-                        conn.createStatement().execute(INDEX_DDL);
+                Runnable r = new Runnable() {
+
+                    @Override
+                    public void run() {
+                        Properties props = 
PropertiesUtil.deepCopy(TEST_PROPERTIES);
+                        try (Connection conn = 
DriverManager.getConnection(getUrl(), props)) {
+                            // Run CREATE INDEX call in separate thread as 
otherwise we block
+                            // this thread (not a realistic scenario) and 
prevent our catchup
+                            // query from adding the missing rows.
+                            conn.createStatement().execute(INDEX_DDL);
+                        } catch (SQLException e) {
+                        } 
                     }
-                } catch (SQLException e) {
-                    throw new DoNotRetryIOException(e);
-                } 
+                    
+                };
+                new Thread(r).start();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index 1e54228..8c43e3f 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -55,82 +55,6 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
ReadOnlyProps.EMPTY_PROPS);
     }
 
-
-    @Test
-    public void testRowCountIndexScrutiny() throws Throwable {
-        String schemaName = generateUniqueName();
-        String tableName = generateUniqueName();
-        String indexName = generateUniqueName();
-        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
-        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
-            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v)");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb')");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc')");
-            conn.commit();
-            
-            int count = conn.createStatement().executeUpdate("DELETE FROM " + 
fullIndexName + " WHERE \":K\"='a' AND \"0:V\"='ccc'");
-            assertEquals(1,count);
-            conn.commit();
-            try {
-                TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
-                fail();
-            } catch (AssertionError e) {
-                assertEquals(e.getMessage(),"Expected data table row count to 
match expected:<1> but was:<2>");
-            }
-        }
-    }
-    @Test
-    public void testExtraRowIndexScrutiny() throws Throwable {
-        String schemaName = generateUniqueName();
-        String tableName = generateUniqueName();
-        String indexName = generateUniqueName();
-        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
-        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
-            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v) INCLUDE (v2)");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb','0')");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc','1')");
-            conn.commit();
-            
-            conn.createStatement().executeUpdate("UPSERT INTO " + 
fullIndexName + " VALUES ('bbb','x','0')");
-            conn.commit();
-            try {
-                TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
-                fail();
-            } catch (AssertionError e) {
-                assertEquals(e.getMessage(),"Expected to find PK in data 
table: ('x')");
-            }
-        }
-    }
-    
-    @Test
-    public void testValueIndexScrutiny() throws Throwable {
-        String schemaName = generateUniqueName();
-        String tableName = generateUniqueName();
-        String indexName = generateUniqueName();
-        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
-        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
-            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v) INCLUDE (v2)");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb','0')");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc','1')");
-            conn.commit();
-            
-            conn.createStatement().executeUpdate("UPSERT INTO " + 
fullIndexName + " VALUES ('ccc','a','2')");
-            conn.commit();
-            try {
-                TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
-                fail();
-            } catch (AssertionError e) {
-                assertEquals(e.getMessage(),"Expected equality for V2, but 
'2'!='1'");
-            }
-        }
-    }
-
     @Test
     public void testMultiVersionsAfterFailure() throws Throwable {
         String schemaName = generateUniqueName();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java
new file mode 100644
index 0000000..d26d9f6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java
@@ -0,0 +1,89 @@
+package org.apache.phoenix.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.junit.Test;
+
+public class TestUtilIT extends ParallelStatsDisabledIT {
+    @Test
+    public void testRowCountIndexScrutiny() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc')");
+            conn.commit();
+            
+            int count = conn.createStatement().executeUpdate("DELETE FROM " + 
fullIndexName + " WHERE \":K\"='a' AND \"0:V\"='ccc'");
+            assertEquals(1,count);
+            conn.commit();
+            try {
+                TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
+                fail();
+            } catch (AssertionError e) {
+                assertEquals(e.getMessage(),"Expected data table row count to 
match expected:<2> but was:<1>");
+            }
+        }
+    }
+    @Test
+    public void testExtraRowIndexScrutiny() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v) INCLUDE (v2)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb','0')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc','1')");
+            conn.commit();
+            
+            conn.createStatement().executeUpdate("UPSERT INTO " + 
fullIndexName + " VALUES ('bbb','x','0')");
+            conn.commit();
+            try {
+                TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
+                fail();
+            } catch (AssertionError e) {
+                assertEquals(e.getMessage(),"Expected to find PK in data 
table: ('x')");
+            }
+        }
+    }
+    
+    @Test
+    public void testValueIndexScrutiny() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v) INCLUDE (v2)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb','0')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc','1')");
+            conn.commit();
+            
+            conn.createStatement().executeUpdate("UPSERT INTO " + 
fullIndexName + " VALUES ('ccc','a','2')");
+            conn.commit();
+            try {
+                TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
+                fail();
+            } catch (AssertionError e) {
+                assertEquals(e.getMessage(),"Expected equality for V2, but 
'2'!='1'");
+            }
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index de8b2ce..b2fd17c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -566,10 +566,6 @@ public class DeleteCompiler {
             } else if (runOnServer) {
                 // TODO: better abstraction
                 Scan scan = context.getScan();
-                // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations 
since there will be
-                // future dated data row mutations that will get in the way of 
generating the
-                // correct index rows on replay.
-                
scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
                 scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, 
QueryConstants.TRUE);
     
                 // Build an ungrouped aggregate query: select COUNT(*) from 
<table> where <where>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
index 1a667ae..b3cedf6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
@@ -53,15 +53,8 @@ public class PostIndexDDLCompiler {
 
     public MutationPlan compile(final PTable indexTable) throws SQLException {
         /*
-         * Handles:
-         * 1) Populate a newly created table with contents.
-         * 2) Activate the index by setting the INDEX_STATE to 
+         * Compiles an UPSERT SELECT command to read from the data table and 
populate the index table
          */
-        // NOTE: For first version, we would use a upsert/select to populate 
the new index table and
-        //   returns synchronously. Creating an index on an existing table 
with large amount of data
-        //   will as a result take a very very long time.
-        //   In the long term, we should change this to an asynchronous 
process to populate the index
-        //   that would allow the user to easily monitor the process of index 
creation.
         StringBuilder indexColumns = new StringBuilder();
         StringBuilder dataColumns = new StringBuilder();
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 679d953..1669ab9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -24,7 +24,6 @@ import java.sql.ParameterMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
@@ -81,8 +80,6 @@ import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.ConstraintViolationException;
 import org.apache.phoenix.schema.DelegateColumn;
 import org.apache.phoenix.schema.IllegalDataException;
-import org.apache.phoenix.schema.MetaDataClient;
-import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PName;
@@ -109,7 +106,6 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -345,10 +341,6 @@ public class UpsertCompiler {
                 
services.getProps().getBoolean(QueryServices.ENABLE_SERVER_UPSERT_SELECT,
                         
QueryServicesOptions.DEFAULT_ENABLE_SERVER_UPSERT_SELECT);
         UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null;
-        // Retry once if auto commit is off, as the meta data may
-        // be out of date. We do not retry if auto commit is on, as we
-        // update the cache up front when we create the resolver in that case.
-        boolean retryOnce = !connection.getAutoCommit();
         boolean useServerTimestampToBe = false;
         
 
@@ -512,7 +504,6 @@ public class UpsertCompiler {
             }
             sameTable = !select.isJoin()
                 && tableRefToBe.equals(selectResolver.getTables().get(0));
-            tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, 
selectResolver.getTables());
             /* We can run the upsert in a coprocessor if:
              * 1) from has only 1 table or server UPSERT SELECT is enabled
              * 2) the select query isn't doing aggregation (which requires a 
client-side final merge)
@@ -550,17 +541,12 @@ public class UpsertCompiler {
             select = SelectStatement.create(select, hint);
             // Pass scan through if same table in upsert and select so that 
projection is computed correctly
             // Use optimizer to choose the best plan
-            try {
-                QueryCompiler compiler = new QueryCompiler(statement, select, 
selectResolver, targetColumns, parallelIteratorFactoryToBe, new 
SequenceManager(statement), false);
-                queryPlanToBe = compiler.compile();
-                // This is post-fix: if the tableRef is a projected table, 
this means there are post-processing
-                // steps and parallelIteratorFactory did not take effect.
-                if (queryPlanToBe.getTableRef().getTable().getType() == 
PTableType.PROJECTED || queryPlanToBe.getTableRef().getTable().getType() == 
PTableType.SUBQUERY) {
-                    parallelIteratorFactoryToBe = null;
-                }
-            } catch (MetaDataEntityNotFoundException e) {
-                retryOnce = false; // don't retry if select clause has meta 
data entities that aren't found, as we already updated the cache
-                throw e;
+            QueryCompiler compiler = new QueryCompiler(statement, select, 
selectResolver, targetColumns, parallelIteratorFactoryToBe, new 
SequenceManager(statement), false);
+            queryPlanToBe = compiler.compile();
+            // This is post-fix: if the tableRef is a projected table, this 
means there are post-processing
+            // steps and parallelIteratorFactory did not take effect.
+            if (queryPlanToBe.getTableRef().getTable().getType() == 
PTableType.PROJECTED || queryPlanToBe.getTableRef().getTable().getType() == 
PTableType.SUBQUERY) {
+                parallelIteratorFactoryToBe = null;
             }
             nValuesToSet = queryPlanToBe.getProjector().getColumnCount();
             // Cannot auto commit if doing aggregation or topN or salted
@@ -699,10 +685,6 @@ public class UpsertCompiler {
                      */
                     final StatementContext context = queryPlan.getContext();
                     final Scan scan = context.getScan();
-                    // Propagate IGNORE_NEWER_MUTATIONS when replaying 
mutations since there will be
-                    // future dated data row mutations that will get in the 
way of generating the
-                    // correct index rows on replay.
-                    
scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
                     
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE, 
UngroupedAggregateRegionObserver.serialize(projectedTable));
                     
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, 
UngroupedAggregateRegionObserver.serialize(projectedExpressions));
                     
@@ -752,10 +734,6 @@ public class UpsertCompiler {
                                 Tuple row = iterator.next();
                                 final long mutationCount = 
(Long)aggProjector.getColumnProjector(0).getValue(row,
                                         PLong.INSTANCE, ptr);
-                                for (PTable index : getNewIndexes(table)) {
-                                    new 
MetaDataClient(connection).buildIndex(index, tableRef,
-                                            scan.getTimeRange().getMax(), 
scan.getTimeRange().getMax() + 1);
-                                }
                                 return new MutationState(maxSize, 
maxSizeBytes, connection) {
                                     @Override
                                     public long getUpdateCount() {
@@ -768,18 +746,6 @@ public class UpsertCompiler {
                             
                         }
 
-                        private List<PTable> getNewIndexes(PTable table) 
throws SQLException {
-                            List<PTable> indexes = table.getIndexes();
-                            List<PTable> newIndexes = new ArrayList<PTable>(2);
-                            PTable newTable = 
PhoenixRuntime.getTableNoCache(connection, table.getName().getString());
-                            for (PTable index : newTable.getIndexes()) {
-                                if (!indexes.contains(index)) {
-                                    newIndexes.add(index);
-                                }
-                            }
-                            return newIndexes;
-                        }
-
                         @Override
                         public ExplainPlan getExplainPlan() throws 
SQLException {
                             List<String> queryPlanSteps =  
aggPlan.getExplainPlan().getPlanSteps();
@@ -1133,24 +1099,6 @@ public class UpsertCompiler {
         return false;
     }
     
-    private TableRef adjustTimestampToMinOfSameTable(TableRef upsertRef, 
List<TableRef> selectRefs) {
-        long minTimestamp = Long.MAX_VALUE;
-        for (TableRef selectRef : selectRefs) {
-            if (selectRef.equals(upsertRef)) {
-                minTimestamp = Math.min(minTimestamp, 
selectRef.getTimeStamp());
-            }
-        }
-        if (minTimestamp != Long.MAX_VALUE) {
-            // If we found the same table is selected from that is being 
upserted to,
-            // reset the timestamp of the upsert (which controls the Put 
timestamp)
-            // to the lowest timestamp we found to ensure that the data being 
selected
-            // will not see the data being upserted. This prevents infinite 
loops
-            // like the one in PHOENIX-1257.
-            return new TableRef(upsertRef, minTimestamp);
-        }
-        return upsertRef;
-    }
-
     private static class UpdateColumnCompiler extends ExpressionCompiler {
         private PColumn column;
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index dae00fd..ac5235a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -273,12 +273,8 @@ public abstract class BaseQueryPlan implements QueryPlan {
                TimeRange scanTimeRange = scan.getTimeRange();
                Long scn = connection.getSCN();
                if (scn == null) {
-                   // If we haven't resolved the time at the beginning of 
compilation, don't
-                   // force the lookup on the server, but use 
HConstants.LATEST_TIMESTAMP instead.
-                   scn = tableRef.getTimeStamp();
-                   if (scn == QueryConstants.UNSET_TIMESTAMP) {
-                       scn = HConstants.LATEST_TIMESTAMP;
-                   }
+                       // Always use latest timestamp unless scn is set or 
transactional (see PHOENIX-4089)
+                scn = HConstants.LATEST_TIMESTAMP;
                }
                try {
                    TimeRange timeRangeToUse = 
ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 05f40b1..0ce163a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -696,7 +696,6 @@ public class MutationState implements SQLCloseable {
     private long validate(TableRef tableRef, Map<ImmutableBytesPtr, 
RowMutationState> rowKeyToColumnMap) throws SQLException {
         Long scn = connection.getSCN();
         MetaDataClient client = new MetaDataClient(connection);
-        long serverTimeStamp = tableRef.getTimeStamp();
         // If we're auto committing, we've already validated the schema when 
we got the ColumnResolver,
         // so no need to do it again here.
         PTable table = tableRef.getTable();
@@ -720,7 +719,6 @@ public class MutationState implements SQLCloseable {
         } 
         long timestamp = result.getMutationTime();
         if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
-            serverTimeStamp = timestamp;
             if (result.wasUpdated()) {
                 List<PColumn> columns = 
Lists.newArrayListWithExpectedSize(table.getColumns().size());
                 for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : 
rowKeyToColumnMap.entrySet()) {
@@ -742,7 +740,7 @@ public class MutationState implements SQLCloseable {
                 }
             }
         }
-        return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP 
? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn;
+        return scn == null ? HConstants.LATEST_TIMESTAMP : scn;
     }
     
     private static long calculateMutationSize(List<Mutation> mutations) {
@@ -911,7 +909,6 @@ public class MutationState implements SQLCloseable {
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing 
mutations to tables")) {
             Span span = trace.getSpan();
             ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable();
-            boolean isTransactional;
             while (tableRefIterator.hasNext()) {
                 // at this point we are going through mutations for each table
                 final TableRef tableRef = tableRefIterator.next();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 2e49b25..718f034 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -35,12 +35,14 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Increment;
@@ -51,8 +53,8 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import 
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -82,12 +84,14 @@ import 
org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
 import org.cloudera.htrace.TraceScope;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 
 /**
@@ -359,12 +363,25 @@ public class Indexer extends BaseRegionObserver {
 
   private static final OperationStatus IGNORE = new 
OperationStatus(OperationStatusCode.SUCCESS);
   private static final OperationStatus FAILURE = new 
OperationStatus(OperationStatusCode.FAILURE, "Unable to acquire row lock");
+  
+  // Assume time stamp of mutation a client defined time stamp if it's not 
within
+  // a factor of ten of the current time.
+  // TODO: get rid of this and have client pass LATEST_TIMESTAMP unless an SCN 
is set
+  private static boolean isProbablyClientControlledTimeStamp(Mutation m) {
+      double ratio = EnvironmentEdgeManager.currentTimeMillis() / 
MetaDataUtil.getClientTimeStamp(m);
+      return ratio > 10 || ratio < 0.10;
+  }
+   
+  private static void setTimeStamp(KeyValue kv, byte[] tsBytes) {
+      int tsOffset = kv.getTimestampOffset();
+      System.arraycopy(tsBytes, 0, kv.getBuffer(), tsOffset, 
Bytes.SIZEOF_LONG);
+  }
 
   public void 
preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
           MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable 
{
 
       // first group all the updates for a single row into a single update to 
be processed
-      Map<ImmutableBytesPtr, MultiMutation> mutations =
+      Map<ImmutableBytesPtr, MultiMutation> mutationsMap =
               new HashMap<ImmutableBytesPtr, MultiMutation>();
           
       Durability defaultDurability = Durability.SYNC_WAL;
@@ -373,17 +390,18 @@ public class Indexer extends BaseRegionObserver {
           defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ? 
                   Durability.SYNC_WAL : defaultDurability;
       }
+      /*
+       * Exclusively lock all rows so we get a consistent read
+       * while determining the index updates
+       */
       Durability durability = Durability.SKIP_WAL;
+      boolean copyMutations = false;
       for (int i = 0; i < miniBatchOp.size(); i++) {
           Mutation m = miniBatchOp.getOperation(i);
           if (this.builder.isAtomicOp(m)) {
               miniBatchOp.setOperationStatus(i, IGNORE);
               continue;
           }
-          // skip this mutation if we aren't enabling indexing
-          // unfortunately, we really should ask if the raw mutation (rather 
than the combined mutation)
-          // should be indexed, which means we need to expose another method 
on the builder. Such is the
-          // way optimization go though.
           if (this.builder.isEnabled(m)) {
               boolean success = false;
               try {
@@ -415,26 +433,73 @@ public class Indexer extends BaseRegionObserver {
               if (effectiveDurablity.ordinal() > durability.ordinal()) {
                   durability = effectiveDurablity;
               }
-    
-              // TODO: remove this code as Phoenix prevents any duplicate
-              // rows in the batch mutation from the client side 
(PHOENIX-4054).
-              // Add the mutation to the batch set
+              // Track whether or not we need to 
               ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-              MultiMutation stored = mutations.get(row);
-              // we haven't seen this row before, so add it
-              if (stored == null) {
-                  stored = new MultiMutation(row);
-                  mutations.put(row, stored);
+              if (mutationsMap.containsKey(row)) {
+                  copyMutations = true;
+              } else {
+                  mutationsMap.put(row, null);
               }
-              stored.addAll(m);
           }
       }
-    
+
       // early exit if it turns out we don't have any edits
-      if (mutations.isEmpty()) {
+      if (mutationsMap.isEmpty()) {
           return;
       }
 
+      // If we're copying the mutations
+      Collection<Mutation> originalMutations;
+      Collection<? extends Mutation> mutations;
+      if (copyMutations) {
+          originalMutations = null;
+          mutations = mutationsMap.values();
+      } else {
+          originalMutations = 
Lists.newArrayListWithExpectedSize(mutationsMap.size());
+          mutations = originalMutations;
+      }
+      
+      Mutation firstMutation = miniBatchOp.getOperation(0);
+      boolean resetTimeStamp = !this.builder.isPartialRebuild(firstMutation) 
&& !isProbablyClientControlledTimeStamp(firstMutation);
+      long now = EnvironmentEdgeManager.currentTimeMillis();
+      byte[] byteNow = Bytes.toBytes(now);
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          Mutation m = miniBatchOp.getOperation(i);
+          // skip this mutation if we aren't enabling indexing
+          // unfortunately, we really should ask if the raw mutation (rather 
than the combined mutation)
+          // should be indexed, which means we need to expose another method 
on the builder. Such is the
+          // way optimization go though.
+          if (miniBatchOp.getOperationStatus(i) != IGNORE && 
this.builder.isEnabled(m)) {
+              if (resetTimeStamp) {
+                  // Unless we're replaying edits to rebuild the index, we 
update the time stamp
+                  // of the data table to prevent overlapping time stamps 
(which prevents index
+                  // inconsistencies as this case isn't handled correctly 
currently).
+                  for (List<Cell> family : m.getFamilyCellMap().values()) {
+                      List<KeyValue> familyKVs = 
KeyValueUtil.ensureKeyValues(family);
+                      for (KeyValue kv : familyKVs) {
+                          setTimeStamp(kv, byteNow);
+                      }
+                  }
+              }
+    
+              // Only copy mutations if we found duplicate rows
+              // (which is pretty much never for Phoenix)
+              if (copyMutations) {
+                  // Add the mutation to the batch set
+                  ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+                  MultiMutation stored = mutationsMap.get(row);
+                  // we haven't seen this row before, so add it
+                  if (stored == null) {
+                      stored = new MultiMutation(row);
+                      mutationsMap.put(row, stored);
+                  }
+                  stored.addAll(m);
+              } else {
+                  originalMutations.add(m);
+              }
+          }
+      }
+    
       // dump all the index updates into a single WAL. They will get combined 
in the end anyways, so
       // don't worry which one we get
       WALEdit edit = miniBatchOp.getWalEdit(0);
@@ -454,7 +519,7 @@ public class Indexer extends BaseRegionObserver {
 
           // get the index updates for all elements in this batch
           Collection<Pair<Mutation, byte[]>> indexUpdates =
-                  this.builder.getIndexUpdate(miniBatchOp, mutations.values());
+                  this.builder.getIndexUpdate(miniBatchOp, mutations);
 
           long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
           if (duration >= slowIndexPrepareThreshold) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index b9174b8..21350d4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -129,4 +129,9 @@ public abstract class BaseIndexBuilder implements 
IndexBuilder {
     public boolean isStopped() {
         return this.stopped;
     }
+
+    @Override
+    public boolean isPartialRebuild(Mutation m) {
+        return false;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 0567d35..f8fb421 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -132,4 +132,8 @@ public class IndexBuildManager implements Stoppable {
   public IndexBuilder getBuilderForTesting() {
     return this.delegate;
   }
+
+  public boolean isPartialRebuild(Mutation m) throws IOException {
+    return this.delegate.isPartialRebuild(m);
+  }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index dff205a..e64a857 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -148,4 +148,6 @@ public interface IndexBuilder extends Stoppable {
    * or null if Increment does not represent an ON DUPLICATE KEY clause.
    */
   public List<Mutation> executeAtomicOp(Increment inc) throws IOException;
+
+  public boolean isPartialRebuild(Mutation m);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index f4deeaf..2823268 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -380,4 +380,9 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
     public static boolean isDupKeyIgnore(byte[] onDupKeyBytes) {
         return onDupKeyBytes != null && 
Bytes.compareTo(ON_DUP_KEY_IGNORE_BYTES, onDupKeyBytes) == 0;
     }
-}
+
+    @Override
+    public boolean isPartialRebuild(Mutation m) {
+        return PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 8b36ff7..44b24ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1245,43 +1245,6 @@ public class MetaDataClient {
         throw new IllegalStateException(); // impossible
     }
     
-    /**
-     * For new mutations only should not be used if there are deletes done in 
the data table between start time and end
-     * time passed to the method.
-     */
-    public MutationState buildIndex(PTable index, TableRef dataTableRef, long 
startTime, long EndTime)
-            throws SQLException {
-        boolean wasAutoCommit = connection.getAutoCommit();
-        try {
-            AlterIndexStatement indexStatement = FACTORY
-                    .alterIndex(
-                            FACTORY.namedTable(null,
-                                    
TableName.create(index.getSchemaName().getString(),
-                                            index.getTableName().getString())),
-                            
dataTableRef.getTable().getTableName().getString(), false, 
PIndexState.INACTIVE);
-            alterIndex(indexStatement);
-            connection.setAutoCommit(true);
-            MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, 
dataTableRef);
-            Scan scan = mutationPlan.getContext().getScan();
-            try {
-                scan.setTimeRange(startTime, EndTime);
-            } catch (IOException e) {
-                throw new SQLException(e);
-            }
-            MutationState state = 
connection.getQueryServices().updateData(mutationPlan);
-            indexStatement = FACTORY
-                    .alterIndex(
-                            FACTORY.namedTable(null,
-                                    
TableName.create(index.getSchemaName().getString(),
-                                            index.getTableName().getString())),
-                            
dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);
-            alterIndex(indexStatement);
-            return state;
-        } finally {
-            connection.setAutoCommit(wasAutoCommit);
-        }
-    }
-
     private MutationPlan getMutationPlanForBuildingIndex(PTable index, 
TableRef dataTableRef) throws SQLException {
         MutationPlan mutationPlan;
         if (index.getIndexType() == IndexType.LOCAL) {
@@ -1321,7 +1284,12 @@ public class MetaDataClient {
 
             // for global indexes on non transactional tables we might have to
             // run a second index population upsert select to handle data rows
-            // that were being written on the server while the index was 
created
+            // that were being written on the server while the index was 
created.
+            // TODO: this sleep time is really arbitrary. If any query is in 
progress
+            // while the index is being built, we're depending on this sleep
+            // waiting them out. Instead we should have a means of waiting 
until
+            // all in progress queries are complete (though I'm not sure that's
+            // feasible). See PHOENIX-4092.
             long sleepTime =
                     connection
                     .getQueryServices()
@@ -1343,6 +1311,12 @@ public class MetaDataClient {
                 // was created
                 long minTimestamp = index.getTimeStamp() - 
firstUpsertSelectTime;
                 try {
+                    // TODO: Use scn or LATEST_TIMESTAMP here? It's possible 
that a DML statement
+                    // ran and ended up with timestamps later than this time. 
If we use a later
+                    // timestamp, we'll need to run the partial index 
rebuilder here as it's
+                    // possible that the updates to the table were made (such 
as deletes) after
+                    // the scn which would not be properly reflected correctly 
this mechanism.
+                    // See PHOENIX-4092.
                     
mutationPlan.getContext().getScan().setTimeRange(minTimestamp, scn);
                 } catch (IOException e) {
                     throw new SQLException(e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/test/java/org/apache/phoenix/util/Repeat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/Repeat.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/Repeat.java
new file mode 100644
index 0000000..7c7c013
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/Repeat.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)  
+@Target({ElementType.METHOD})  
+public @interface Repeat {  
+     int value();  
+}  
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/test/java/org/apache/phoenix/util/RunUntilFailure.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/RunUntilFailure.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/RunUntilFailure.java
new file mode 100644
index 0000000..2b378c1
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/RunUntilFailure.java
@@ -0,0 +1,73 @@
+package org.apache.phoenix.util;
+
+import org.junit.Ignore;
+import org.junit.runner.Description;
+import org.junit.runner.notification.Failure;
+import org.junit.runner.notification.RunListener;
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.InitializationError;
+import org.junit.runners.model.Statement;
+
+public class RunUntilFailure extends BlockJUnit4ClassRunner {  
+    private boolean hasFailure;
+
+    public RunUntilFailure(Class<?> klass) throws InitializationError {  
+        super(klass);  
+    }  
+
+    @Override  
+    protected Description describeChild(FrameworkMethod method) {  
+        if (method.getAnnotation(Repeat.class) != null &&  
+                method.getAnnotation(Ignore.class) == null) {  
+            return describeRepeatTest(method);  
+        }  
+        return super.describeChild(method);  
+    }  
+
+    private Description describeRepeatTest(FrameworkMethod method) {  
+        int times = method.getAnnotation(Repeat.class).value();  
+
+        Description description = Description.createSuiteDescription(  
+                testName(method) + " [" + times + " times]",  
+                method.getAnnotations());  
+
+        for (int i = 1; i <= times; i++) {  
+            description.addChild(Description.createTestDescription(  
+                    getTestClass().getJavaClass(),  
+                    testName(method) + "-" + i));  
+        }  
+        return description;  
+    }  
+
+    @Override  
+    protected void runChild(final FrameworkMethod method, RunNotifier 
notifier) {  
+        Description description = describeChild(method);  
+
+        if (method.getAnnotation(Repeat.class) != null &&  
+                method.getAnnotation(Ignore.class) == null) {  
+            runRepeatedly(methodBlock(method), description, notifier);  
+        }  
+        super.runChild(method, notifier);  
+    }  
+
+    private void runRepeatedly(Statement statement, Description description,  
+            RunNotifier notifier) {
+        notifier.addListener(new RunListener() {
+            @Override
+            public void testFailure(Failure failure) {
+                hasFailure = true;
+            }
+        });
+        for (Description desc : description.getChildren()) {  
+            if (hasFailure) {
+                notifier.fireTestIgnored(desc);
+            } else if(!desc.isSuite()) {
+                runLeaf(statement, desc, notifier);
+            }
+        }  
+    }  
+
+}  
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8261009f/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 12b1c03..af924f1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -983,8 +983,8 @@ public class TestUtil {
         }
         
         long dcount = getRowCount(conn, fullTableName);
-        assertEquals("Expected data table row count to match", icount, dcount);
-        return icount;
+        assertEquals("Expected data table row count to match", dcount, icount);
+        return dcount;
     }
     
     private static long getRowCount(Connection conn, String tableName) throws 
SQLException {

Reply via email to