Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 d55bc82a0 -> 03152d03e


PHOENIX-4057 Do not issue index updates for out of order mutation


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/31edca2c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/31edca2c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/31edca2c

Branch: refs/heads/4.x-HBase-1.2
Commit: 31edca2c15fddc42df32ae1a7d7c4736a29e7924
Parents: d55bc82
Author: James Taylor <jamestay...@apache.org>
Authored: Tue Aug 1 16:41:01 2017 -0700
Committer: James Taylor <jamestay...@apache.org>
Committed: Wed Aug 2 16:04:23 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/ConcurrentMutationsIT.java  |   3 -
 .../phoenix/end2end/OutOfOrderMutationsIT.java  | 561 +++++++++++++++++++
 .../example/EndToEndCoveredIndexingIT.java      |   6 +-
 .../hbase/index/covered/NonTxIndexBuilder.java  |  75 ++-
 4 files changed, 613 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/31edca2c/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
index 19cb70e..9ed5174 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -52,7 +52,6 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
@@ -80,7 +79,6 @@ public class ConcurrentMutationsIT extends 
BaseUniqueNamesOwnClusterIT {
     }
     
     @Test
-    @Ignore
     public void testSynchronousDeletesAndUpsertValues() throws Exception {
         final String tableName = generateUniqueName();
         final String indexName = generateUniqueName();
@@ -164,7 +162,6 @@ public class ConcurrentMutationsIT extends 
BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
-    @Ignore
     public void testConcurrentDeletesAndUpsertValues() throws Exception {
         final String tableName = generateUniqueName();
         final String indexName = generateUniqueName();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31edca2c/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
new file mode 100644
index 0000000..3cf7336
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT {
+    @Test
+    public void testOutOfOrderDelete() throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        long ts = 1000;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);     
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 
CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, CONSTRAINT pk PRIMARY KEY 
(k1,k2)) COLUMN_ENCODED_BYTES = 0");
+        conn.close();
+
+        ts = 1010;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(k2,k1,ts)");
+        conn.close();
+        
+        ts = 1020;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " VALUES('aa','aa',?)");
+        stmt.setTimestamp(1, new Timestamp(1000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1040;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("DELETE FROM " + tableName + " WHERE 
k1='aa'");
+        conn.commit();
+        conn.close();
+        
+        ts = 1030;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES('aa','aa',?)");
+        stmt.setTimestamp(1, new Timestamp(2000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1050;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        
+        long count1 = getRowCount(conn, tableName);
+        long count2 = getRowCount(conn, indexName);
+        assertEquals(0, count1);
+        assertEquals(0, count2);
+        conn.close();
+        
+        assertNoTimeStampAt(conn, indexName, 1030);
+        
+        /**
+         *
+            ************ dumping T000001;hconnection-0x2b137c55 **************
+            aaaa/0:/1040/DeleteFamily/vlen=0/seqid=0
+            aaaa/0:TS/1030/Put/vlen=12/seqid=0
+            aaaa/0:TS/1020/Put/vlen=12/seqid=0
+            aaaa/0:_0/1030/Put/vlen=1/seqid=0
+            aaaa/0:_0/1020/Put/vlen=1/seqid=0
+            -----------------------------------------------
+            ************ dumping T000002;hconnection-0x2b137c55 **************
+            aaaa\xC2\x0B/0:/1040/DeleteFamily/vlen=0/seqid=0
+            aaaa\xC2\x0B/0:_0/1020/Put/vlen=2/seqid=0
+            -----------------------------------------------
+         */
+    }
+
+    private static void assertNoTimeStampAt(Connection conn, String tableName, 
long ts) throws SQLException, IOException {
+        Scan scan = new Scan();
+        scan.setRaw(true);
+        scan.setMaxVersions();
+        HTableInterface indexHTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName));
+        ResultScanner scanner = indexHTable.getScanner(scan);
+        Result result;
+        while ((result = scanner.next()) != null) {
+            CellScanner cellScanner = result.cellScanner();
+            while (cellScanner.advance()) {
+                assertNotEquals(ts, cellScanner.current().getTimestamp());
+            }
+        }
+    }
+    
+    @Test
+    public void testOutOfOrderUpsert() throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        long ts = 1000;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);     
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 
CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, CONSTRAINT pk PRIMARY KEY 
(k1,k2)) COLUMN_ENCODED_BYTES = 0");
+        conn.close();
+
+        ts = 1010;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(k2,k1,ts)");
+        conn.close();
+        
+        ts = 1020;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " VALUES('aa','aa',?)");
+        stmt.setTimestamp(1, new Timestamp(1000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1040;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES('aa','aa',?)");
+        Timestamp expectedTimestamp = new Timestamp(3000L);
+        stmt.setTimestamp(1, expectedTimestamp);
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1030;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES('aa','aa',?)");
+        stmt.setTimestamp(1, new Timestamp(2000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1050;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        
+        long count1 = getRowCount(conn, tableName);
+        long count2 = getRowCount(conn, indexName);
+        assertEquals("Table should have 1 row", 1, count1);
+        assertEquals("Index should have 1 row", 1, count2);
+        conn.close();
+        
+        ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertFalse(rs.next());
+        
+        rs = conn.createStatement().executeQuery("SELECT \"0:TS\" FROM " + 
indexName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertFalse(rs.next());
+
+        assertNoTimeStampAt(conn, indexName, 1030);
+
+        /**
+         *
+            ************ dumping T000001;hconnection-0x3e7a21d **************
+            aaaa/0:TS/1040/Put/vlen=12/seqid=0
+            aaaa/0:TS/1030/Put/vlen=12/seqid=0
+            aaaa/0:TS/1020/Put/vlen=12/seqid=0
+            aaaa/0:_0/1040/Put/vlen=1/seqid=0
+            aaaa/0:_0/1030/Put/vlen=1/seqid=0
+            aaaa/0:_0/1020/Put/vlen=1/seqid=0
+            -----------------------------------------------
+            ************ dumping T000002;hconnection-0x3e7a21d **************
+            aaaa\xC2\x0B/0:/1040/DeleteFamily/vlen=0/seqid=0
+            aaaa\xC2\x0B/0:_0/1020/Put/vlen=2/seqid=0
+            aaaa\xC2\x1F/0:_0/1040/Put/vlen=2/seqid=0
+            -----------------------------------------------
+         */
+    }
+
+    private static long getRowCount(Connection conn, String tableName) throws 
SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ count(*) FROM " + tableName);
+        assertTrue(rs.next());
+        return rs.getLong(1);
+    }
+
+    @Test
+    public void testSetIndexedColumnToNull() throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        long ts = 1000;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);     
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 
CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, CONSTRAINT pk PRIMARY KEY 
(k1,k2)) COLUMN_ENCODED_BYTES = 0");
+        conn.close();
+
+        ts = 1010;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(k2,k1,ts)");
+        conn.close();
+        
+        ts = 1020;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " VALUES('aa','aa',?)");
+        stmt.setTimestamp(1, new Timestamp(1000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1040;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES('aa','aa', ?)");
+        Timestamp expectedTimestamp = new Timestamp(3000L);
+        stmt.setTimestamp(1, expectedTimestamp);
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1030;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES('aa','aa',null)");
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1050;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        
+        
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
+        
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
+
+        long count1 = getRowCount(conn, tableName);
+        long count2 = getRowCount(conn, indexName);
+        assertEquals("Table should have 1 row", 1, count1);
+        assertEquals("Index should have 1 row", 1, count2);
+        conn.close();
+        
+        ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertFalse(rs.next());
+        
+        rs = conn.createStatement().executeQuery("SELECT \"0:TS\" FROM " + 
indexName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testSetIndexedColumnToNull2() throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        long ts = 1000;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);     
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 
CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, CONSTRAINT pk PRIMARY KEY 
(k1,k2)) COLUMN_ENCODED_BYTES = 0");
+        conn.close();
+
+        ts = 1010;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(k2,k1,ts)");
+        conn.close();
+        
+        ts = 1020;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " VALUES('aa','aa',?)");
+        stmt.setTimestamp(1, new Timestamp(1000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1040;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES('aa','aa',?)");
+        Timestamp expectedTimestamp = null;
+        stmt.setTimestamp(1, expectedTimestamp);
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1030;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES('aa','aa', ?)");
+        stmt.setTimestamp(1, new Timestamp(3000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1050;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        
+        
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
+        
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
+
+        long count1 = getRowCount(conn, tableName);
+        long count2 = getRowCount(conn, indexName);
+        assertEquals("Table should have 1 row", 1, count1);
+        assertEquals("Index should have 1 row", 1, count2);
+        conn.close();
+        
+        ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertFalse(rs.next());
+        
+        rs = conn.createStatement().executeQuery("SELECT \"0:TS\" FROM " + 
indexName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertFalse(rs.next());
+    }    
+    
+    @Test
+    @Ignore("PHOENIX-4058 Generate correct index updates when DeleteColumn 
processed before Put with same timestamp")
+    public void testSetIndexedColumnToNullAndValueAtSameTS() throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        long ts = 1000;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);     
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 
CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, V VARCHAR, V2 VARCHAR, 
CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0");
+        conn.close();
+
+        ts = 1010;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(k2,k1,ts) INCLUDE (V, v2)");
+        conn.close();
+        
+        ts = 1020;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " VALUES('aa','aa',?, '0')");
+        stmt.setTimestamp(1, new Timestamp(1000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        Timestamp expectedTimestamp;
+        ts = 1040;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES('aa','aa',?, null)");
+        expectedTimestamp = null;
+        stmt.setTimestamp(1, expectedTimestamp);
+        stmt.executeUpdate();
+        conn.commit();
+        stmt.setTimestamp(1, new Timestamp(3000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1050;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        
+        
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
+        
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
+
+        long count1 = getRowCount(conn, tableName);
+        long count2 = getRowCount(conn, indexName);
+        assertEquals("Table should have 1 row", 1, count1);
+        assertEquals("Index should have 1 row", 1, count2);
+        conn.close();
+        
+        ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts,v FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertEquals(null, rs.getString(2));
+        assertFalse(rs.next());
+        
+        rs = conn.createStatement().executeQuery("SELECT \"0:TS\", \"0:V\" 
FROM " + indexName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertEquals(null, rs.getString(2));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testSetIndexedColumnToNullAndValueAtSameTSWithStoreNulls1() 
throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        long ts = 1000;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);     
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 
CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, V VARCHAR, V2 VARCHAR, 
CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+        conn.close();
+
+        ts = 1010;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(k2,k1,ts) INCLUDE (V, v2)");
+        conn.close();
+        
+        ts = 1020;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " VALUES('aa','aa',?, '0')");
+        stmt.setTimestamp(1, new Timestamp(1000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        Timestamp expectedTimestamp;
+        ts = 1040;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES('aa','aa',?, null)");
+        expectedTimestamp = null;
+        stmt.setTimestamp(1, expectedTimestamp);
+        stmt.executeUpdate();
+        conn.commit();
+        expectedTimestamp = new Timestamp(3000L);
+        stmt.setTimestamp(1, expectedTimestamp);
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1050;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        
+        
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
+        
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
+
+        long count1 = getRowCount(conn, tableName);
+        long count2 = getRowCount(conn, indexName);
+        assertEquals("Table should have 1 row", 1, count1);
+        assertEquals("Index should have 1 row", 1, count2);
+        conn.close();
+        
+        ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts,v FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertEquals(null, rs.getString(2));
+        assertFalse(rs.next());
+        
+        rs = conn.createStatement().executeQuery("SELECT \"0:TS\", \"0:V\" 
FROM " + indexName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertEquals(null, rs.getString(2));
+        assertFalse(rs.next());
+    }
+    
+    @Test
+    public void testSetIndexedColumnToNullAndValueAtSameTSWithStoreNulls2() 
throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        long ts = 1000;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);     
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 
CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, V VARCHAR, V2 VARCHAR, 
CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+        conn.close();
+
+        ts = 1010;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(k2,k1,ts) INCLUDE (V, v2)");
+        conn.close();
+        
+        ts = 1020;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " VALUES('aa','aa',?, '0')");
+        stmt.setTimestamp(1, new Timestamp(1000L));
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        Timestamp expectedTimestamp;
+        ts = 1040;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES('aa','aa',?, null)");
+        expectedTimestamp = new Timestamp(3000L);
+        stmt.setTimestamp(1, expectedTimestamp);
+        stmt.executeUpdate();
+        conn.commit();
+        expectedTimestamp = null;
+        stmt.setTimestamp(1, expectedTimestamp);
+        stmt.executeUpdate();
+        conn.commit();
+        conn.close();
+        
+        ts = 1050;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        conn = DriverManager.getConnection(getUrl(), props);
+        
+        
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
+        
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
+
+        long count1 = getRowCount(conn, tableName);
+        long count2 = getRowCount(conn, indexName);
+        assertEquals("Table should have 1 row", 1, count1);
+        assertEquals("Index should have 1 row", 1, count2);
+        conn.close();
+        
+        ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts,v FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertEquals(null, rs.getString(2));
+        assertFalse(rs.next());
+        
+        rs = conn.createStatement().executeQuery("SELECT \"0:TS\", \"0:V\" 
FROM " + indexName);
+        assertTrue(rs.next());
+        assertEquals(expectedTimestamp, rs.getTimestamp(1));
+        assertEquals(null, rs.getString(2));
+        assertFalse(rs.next());
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31edca2c/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
index 9151577..70f29b1 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
@@ -52,9 +52,9 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -381,6 +381,7 @@ public class EndToEndCoveredIndexingIT {
      * @throws Exception on failure
      */
     @Test
+    @Ignore("PHOENIX-4057 Do not issue index updates for out of order 
mutation")
     public void testMultipleTimestampsInSingleDelete() throws Exception {
         HTable primary = createSetupTables(fam1);
 
@@ -505,6 +506,7 @@ public class EndToEndCoveredIndexingIT {
      * @throws Exception
      */
     @Test
+    @Ignore("PHOENIX-4057 Do not issue index updates for out of order 
mutation")
     public void testOutOfOrderUpdates() throws Exception {
         HTable primary = createSetupTables(fam1);
 
@@ -563,6 +565,7 @@ public class EndToEndCoveredIndexingIT {
      * @throws Exception on failure
      */
     @Test
+    @Ignore("PHOENIX-4057 Do not issue index updates for out of order 
mutation")
     public void testExceedVersionsOutOfOrderPut() throws Exception {
         // setup the index
         HTable primary = createSetupTables(fam2);
@@ -686,6 +689,7 @@ public class EndToEndCoveredIndexingIT {
      * @throws Exception on failure
      */
     @Test
+    @Ignore("PHOENIX-4057 Do not issue index updates for out of order 
mutation")
     public void testExceedVersionsOutOfOrderUpdates() throws Exception {
         HTable primary = createSetupTables(fam1);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31edca2c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 9c7ec2e..8d2bd83 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -221,35 +221,40 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
 
         // A.2 do a single pass first for the updates to the current state
         state.applyPendingUpdates();
-        long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, 
indexMetaData);
-        // if all the updates are the latest thing in the index, we are done - 
don't go and fix history
-        if (ColumnTracker.isNewestTime(minTs)) { return false; }
-
-        // A.3 otherwise, we need to roll up through the current state and get 
the 'correct' view of the
-        // index. after this, we have the correct view of the index, from the 
batch up to the index
-        while (!ColumnTracker.isNewestTime(minTs)) {
-            minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, 
indexMetaData);
-        }
-
-        // B. only cleanup the current state if we need to - its a huge waste 
of effort otherwise.
-        if (requireCurrentStateCleanup) {
-            // roll back the pending update. This is needed so we can remove 
all the 'old' index entries.
-            // We don't need to do the puts here, but just the deletes at the 
given timestamps since we
-            // just want to completely hide the incorrect entries.
-            state.rollback(batch.getKvs());
-            // setup state
-            state.setPendingUpdates(batch.getKvs());
-
-            // cleanup the pending batch. If anything in the correct history 
is covered by Deletes used to
-            // 'fix' history (same row key and ts), we just drop the delete 
(we don't want to drop both
-            // because the update may have a different set of columns or value 
based on the update).
-            cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, 
indexMetaData);
-
-            // have to roll the state forward again, so the current state is 
correct
-            state.applyPendingUpdates();
-            return true;
-        }
+        addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData);
+        // FIXME: PHOENIX-4057 do not attempt to issue index updates
+        // for out-of-order mutations since it corrupts the index.
         return false;
+        
+//        long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, 
indexMetaData);
+//        // if all the updates are the latest thing in the index, we are done 
- don't go and fix history
+//        if (ColumnTracker.isNewestTime(minTs)) { return false; }
+//
+//        // A.3 otherwise, we need to roll up through the current state and 
get the 'correct' view of the
+//        // index. after this, we have the correct view of the index, from 
the batch up to the index
+//        while (!ColumnTracker.isNewestTime(minTs)) {
+//            minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, 
indexMetaData);
+//        }
+//
+//        // B. only cleanup the current state if we need to - its a huge 
waste of effort otherwise.
+//        if (requireCurrentStateCleanup) {
+//            // roll back the pending update. This is needed so we can remove 
all the 'old' index entries.
+//            // We don't need to do the puts here, but just the deletes at 
the given timestamps since we
+//            // just want to completely hide the incorrect entries.
+//            state.rollback(batch.getKvs());
+//            // setup state
+//            state.setPendingUpdates(batch.getKvs());
+//
+//            // cleanup the pending batch. If anything in the correct history 
is covered by Deletes used to
+//            // 'fix' history (same row key and ts), we just drop the delete 
(we don't want to drop both
+//            // because the update may have a different set of columns or 
value based on the update).
+//            cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, 
indexMetaData);
+//
+//            // have to roll the state forward again, so the current state is 
correct
+//            state.applyPendingUpdates();
+//            return true;
+//        }
+//        return false;
     }
 
     private long addUpdateForGivenTimestamp(long ts, LocalTableState state, 
IndexUpdateManager updateMap, IndexMetaData indexMetaData)
@@ -308,6 +313,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
             if (trackerTs < minTs) {
                 minTs = tracker.getTS();
             }
+            
+            // FIXME: PHOENIX-4057 do not attempt to issue index updates
+            // for out-of-order mutations since it corrupts the index.
+            if (tracker.hasNewerTimestamps()) {
+                continue;
+            }
+            
             // track index hints for the next round. Hint if we need an update 
for that column for the
             // next timestamp. These columns clearly won't need to update as 
we go through time as they
             // already match the most recent possible thing.
@@ -392,6 +404,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
                 if (!d.isValid()) {
                     continue;
                 }
+                // FIXME: PHOENIX-4057 do not attempt to issue index updates
+                // for out-of-order mutations since it corrupts the index.
+                final ColumnTracker tracker = d.getIndexedColumns();
+                if (tracker.hasNewerTimestamps()) {
+                    continue;
+                }
+                
                 // override the timestamps in the delete to match the current 
batch.
                 Delete remove = (Delete)d.getUpdate();
                 remove.setTimestamp(ts);

Reply via email to