Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.2 d55bc82a0 -> 03152d03e
PHOENIX-4057 Do not issue index updates for out of order mutation Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/31edca2c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/31edca2c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/31edca2c Branch: refs/heads/4.x-HBase-1.2 Commit: 31edca2c15fddc42df32ae1a7d7c4736a29e7924 Parents: d55bc82 Author: James Taylor <jamestay...@apache.org> Authored: Tue Aug 1 16:41:01 2017 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Wed Aug 2 16:04:23 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/ConcurrentMutationsIT.java | 3 - .../phoenix/end2end/OutOfOrderMutationsIT.java | 561 +++++++++++++++++++ .../example/EndToEndCoveredIndexingIT.java | 6 +- .../hbase/index/covered/NonTxIndexBuilder.java | 75 ++- 4 files changed, 613 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/31edca2c/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java index 19cb70e..9ed5174 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java @@ -52,7 +52,6 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.TestUtil; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import com.google.common.collect.Maps; @@ -80,7 +79,6 @@ public class ConcurrentMutationsIT extends BaseUniqueNamesOwnClusterIT { } @Test - @Ignore public void testSynchronousDeletesAndUpsertValues() throws Exception { final String tableName = generateUniqueName(); final String indexName = generateUniqueName(); @@ -164,7 +162,6 @@ public class ConcurrentMutationsIT extends BaseUniqueNamesOwnClusterIT { } @Test - @Ignore public void testConcurrentDeletesAndUpsertValues() throws Exception { final String tableName = generateUniqueName(); final String indexName = generateUniqueName(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/31edca2c/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java new file mode 100644 index 0000000..3cf7336 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java @@ -0,0 +1,561 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Properties; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Ignore; +import org.junit.Test; + +public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { + @Test + public void testOutOfOrderDelete() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + long ts = 1000; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0"); + conn.close(); + + ts = 1010; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(k2,k1,ts)"); + conn.close(); + + ts = 1020; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?)"); + stmt.setTimestamp(1, new Timestamp(1000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1040; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k1='aa'"); + conn.commit(); + conn.close(); + + ts = 1030; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?)"); + stmt.setTimestamp(1, new Timestamp(2000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1050; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + + long count1 = getRowCount(conn, tableName); + long count2 = getRowCount(conn, indexName); + assertEquals(0, count1); + assertEquals(0, count2); + conn.close(); + + assertNoTimeStampAt(conn, indexName, 1030); + + /** + * + ************ dumping T000001;hconnection-0x2b137c55 ************** + aaaa/0:/1040/DeleteFamily/vlen=0/seqid=0 + aaaa/0:TS/1030/Put/vlen=12/seqid=0 + aaaa/0:TS/1020/Put/vlen=12/seqid=0 + aaaa/0:_0/1030/Put/vlen=1/seqid=0 + aaaa/0:_0/1020/Put/vlen=1/seqid=0 + ----------------------------------------------- + ************ dumping T000002;hconnection-0x2b137c55 ************** + aaaa\xC2\x0B/0:/1040/DeleteFamily/vlen=0/seqid=0 + aaaa\xC2\x0B/0:_0/1020/Put/vlen=2/seqid=0 + ----------------------------------------------- + */ + } + + private static void assertNoTimeStampAt(Connection conn, String tableName, long ts) throws SQLException, IOException { + Scan scan = new Scan(); + scan.setRaw(true); + scan.setMaxVersions(); + HTableInterface indexHTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)); + ResultScanner scanner = indexHTable.getScanner(scan); + Result result; + while ((result = scanner.next()) != null) { + CellScanner cellScanner = result.cellScanner(); + while (cellScanner.advance()) { + assertNotEquals(ts, cellScanner.current().getTimestamp()); + } + } + } + + @Test + public void testOutOfOrderUpsert() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + long ts = 1000; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0"); + conn.close(); + + ts = 1010; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(k2,k1,ts)"); + conn.close(); + + ts = 1020; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?)"); + stmt.setTimestamp(1, new Timestamp(1000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1040; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?)"); + Timestamp expectedTimestamp = new Timestamp(3000L); + stmt.setTimestamp(1, expectedTimestamp); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1030; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?)"); + stmt.setTimestamp(1, new Timestamp(2000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1050; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + + long count1 = getRowCount(conn, tableName); + long count2 = getRowCount(conn, indexName); + assertEquals("Table should have 1 row", 1, count1); + assertEquals("Index should have 1 row", 1, count2); + conn.close(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("SELECT \"0:TS\" FROM " + indexName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertFalse(rs.next()); + + assertNoTimeStampAt(conn, indexName, 1030); + + /** + * + ************ dumping T000001;hconnection-0x3e7a21d ************** + aaaa/0:TS/1040/Put/vlen=12/seqid=0 + aaaa/0:TS/1030/Put/vlen=12/seqid=0 + aaaa/0:TS/1020/Put/vlen=12/seqid=0 + aaaa/0:_0/1040/Put/vlen=1/seqid=0 + aaaa/0:_0/1030/Put/vlen=1/seqid=0 + aaaa/0:_0/1020/Put/vlen=1/seqid=0 + ----------------------------------------------- + ************ dumping T000002;hconnection-0x3e7a21d ************** + aaaa\xC2\x0B/0:/1040/DeleteFamily/vlen=0/seqid=0 + aaaa\xC2\x0B/0:_0/1020/Put/vlen=2/seqid=0 + aaaa\xC2\x1F/0:_0/1040/Put/vlen=2/seqid=0 + ----------------------------------------------- + */ + } + + private static long getRowCount(Connection conn, String tableName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName); + assertTrue(rs.next()); + return rs.getLong(1); + } + + @Test + public void testSetIndexedColumnToNull() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + long ts = 1000; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0"); + conn.close(); + + ts = 1010; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(k2,k1,ts)"); + conn.close(); + + ts = 1020; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?)"); + stmt.setTimestamp(1, new Timestamp(1000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1040; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa', ?)"); + Timestamp expectedTimestamp = new Timestamp(3000L); + stmt.setTimestamp(1, expectedTimestamp); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1030; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',null)"); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1050; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); + + long count1 = getRowCount(conn, tableName); + long count2 = getRowCount(conn, indexName); + assertEquals("Table should have 1 row", 1, count1); + assertEquals("Index should have 1 row", 1, count2); + conn.close(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("SELECT \"0:TS\" FROM " + indexName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertFalse(rs.next()); + } + + @Test + public void testSetIndexedColumnToNull2() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + long ts = 1000; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0"); + conn.close(); + + ts = 1010; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(k2,k1,ts)"); + conn.close(); + + ts = 1020; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?)"); + stmt.setTimestamp(1, new Timestamp(1000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1040; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?)"); + Timestamp expectedTimestamp = null; + stmt.setTimestamp(1, expectedTimestamp); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1030; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa', ?)"); + stmt.setTimestamp(1, new Timestamp(3000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1050; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); + + long count1 = getRowCount(conn, tableName); + long count2 = getRowCount(conn, indexName); + assertEquals("Table should have 1 row", 1, count1); + assertEquals("Index should have 1 row", 1, count2); + conn.close(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("SELECT \"0:TS\" FROM " + indexName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertFalse(rs.next()); + } + + @Test + @Ignore("PHOENIX-4058 Generate correct index updates when DeleteColumn processed before Put with same timestamp") + public void testSetIndexedColumnToNullAndValueAtSameTS() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + long ts = 1000; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, V VARCHAR, V2 VARCHAR, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0"); + conn.close(); + + ts = 1010; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(k2,k1,ts) INCLUDE (V, v2)"); + conn.close(); + + ts = 1020; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, '0')"); + stmt.setTimestamp(1, new Timestamp(1000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + Timestamp expectedTimestamp; + ts = 1040; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, null)"); + expectedTimestamp = null; + stmt.setTimestamp(1, expectedTimestamp); + stmt.executeUpdate(); + conn.commit(); + stmt.setTimestamp(1, new Timestamp(3000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1050; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); + + long count1 = getRowCount(conn, tableName); + long count2 = getRowCount(conn, indexName); + assertEquals("Table should have 1 row", 1, count1); + assertEquals("Index should have 1 row", 1, count2); + conn.close(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertEquals(null, rs.getString(2)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("SELECT \"0:TS\", \"0:V\" FROM " + indexName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertEquals(null, rs.getString(2)); + assertFalse(rs.next()); + } + + @Test + public void testSetIndexedColumnToNullAndValueAtSameTSWithStoreNulls1() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + long ts = 1000; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, V VARCHAR, V2 VARCHAR, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.close(); + + ts = 1010; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(k2,k1,ts) INCLUDE (V, v2)"); + conn.close(); + + ts = 1020; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, '0')"); + stmt.setTimestamp(1, new Timestamp(1000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + Timestamp expectedTimestamp; + ts = 1040; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, null)"); + expectedTimestamp = null; + stmt.setTimestamp(1, expectedTimestamp); + stmt.executeUpdate(); + conn.commit(); + expectedTimestamp = new Timestamp(3000L); + stmt.setTimestamp(1, expectedTimestamp); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1050; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); + + long count1 = getRowCount(conn, tableName); + long count2 = getRowCount(conn, indexName); + assertEquals("Table should have 1 row", 1, count1); + assertEquals("Index should have 1 row", 1, count2); + conn.close(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertEquals(null, rs.getString(2)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("SELECT \"0:TS\", \"0:V\" FROM " + indexName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertEquals(null, rs.getString(2)); + assertFalse(rs.next()); + } + + @Test + public void testSetIndexedColumnToNullAndValueAtSameTSWithStoreNulls2() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + long ts = 1000; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, V VARCHAR, V2 VARCHAR, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.close(); + + ts = 1010; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(k2,k1,ts) INCLUDE (V, v2)"); + conn.close(); + + ts = 1020; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, '0')"); + stmt.setTimestamp(1, new Timestamp(1000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + Timestamp expectedTimestamp; + ts = 1040; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, null)"); + expectedTimestamp = new Timestamp(3000L); + stmt.setTimestamp(1, expectedTimestamp); + stmt.executeUpdate(); + conn.commit(); + expectedTimestamp = null; + stmt.setTimestamp(1, expectedTimestamp); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1050; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); + + long count1 = getRowCount(conn, tableName); + long count2 = getRowCount(conn, indexName); + assertEquals("Table should have 1 row", 1, count1); + assertEquals("Index should have 1 row", 1, count2); + conn.close(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertEquals(null, rs.getString(2)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("SELECT \"0:TS\", \"0:V\" FROM " + indexName); + assertTrue(rs.next()); + assertEquals(expectedTimestamp, rs.getTimestamp(1)); + assertEquals(null, rs.getString(2)); + assertFalse(rs.next()); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/31edca2c/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java index 9151577..70f29b1 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java @@ -52,9 +52,9 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.TestUtil; -import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -381,6 +381,7 @@ public class EndToEndCoveredIndexingIT { * @throws Exception on failure */ @Test + @Ignore("PHOENIX-4057 Do not issue index updates for out of order mutation") public void testMultipleTimestampsInSingleDelete() throws Exception { HTable primary = createSetupTables(fam1); @@ -505,6 +506,7 @@ public class EndToEndCoveredIndexingIT { * @throws Exception */ @Test + @Ignore("PHOENIX-4057 Do not issue index updates for out of order mutation") public void testOutOfOrderUpdates() throws Exception { HTable primary = createSetupTables(fam1); @@ -563,6 +565,7 @@ public class EndToEndCoveredIndexingIT { * @throws Exception on failure */ @Test + @Ignore("PHOENIX-4057 Do not issue index updates for out of order mutation") public void testExceedVersionsOutOfOrderPut() throws Exception { // setup the index HTable primary = createSetupTables(fam2); @@ -686,6 +689,7 @@ public class EndToEndCoveredIndexingIT { * @throws Exception on failure */ @Test + @Ignore("PHOENIX-4057 Do not issue index updates for out of order mutation") public void testExceedVersionsOutOfOrderUpdates() throws Exception { HTable primary = createSetupTables(fam1); http://git-wip-us.apache.org/repos/asf/phoenix/blob/31edca2c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java index 9c7ec2e..8d2bd83 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@ -221,35 +221,40 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { // A.2 do a single pass first for the updates to the current state state.applyPendingUpdates(); - long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData); - // if all the updates are the latest thing in the index, we are done - don't go and fix history - if (ColumnTracker.isNewestTime(minTs)) { return false; } - - // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the - // index. after this, we have the correct view of the index, from the batch up to the index - while (!ColumnTracker.isNewestTime(minTs)) { - minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, indexMetaData); - } - - // B. only cleanup the current state if we need to - its a huge waste of effort otherwise. - if (requireCurrentStateCleanup) { - // roll back the pending update. This is needed so we can remove all the 'old' index entries. - // We don't need to do the puts here, but just the deletes at the given timestamps since we - // just want to completely hide the incorrect entries. - state.rollback(batch.getKvs()); - // setup state - state.setPendingUpdates(batch.getKvs()); - - // cleanup the pending batch. If anything in the correct history is covered by Deletes used to - // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both - // because the update may have a different set of columns or value based on the update). - cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, indexMetaData); - - // have to roll the state forward again, so the current state is correct - state.applyPendingUpdates(); - return true; - } + addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData); + // FIXME: PHOENIX-4057 do not attempt to issue index updates + // for out-of-order mutations since it corrupts the index. return false; + +// long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData); +// // if all the updates are the latest thing in the index, we are done - don't go and fix history +// if (ColumnTracker.isNewestTime(minTs)) { return false; } +// +// // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the +// // index. after this, we have the correct view of the index, from the batch up to the index +// while (!ColumnTracker.isNewestTime(minTs)) { +// minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, indexMetaData); +// } +// +// // B. only cleanup the current state if we need to - its a huge waste of effort otherwise. +// if (requireCurrentStateCleanup) { +// // roll back the pending update. This is needed so we can remove all the 'old' index entries. +// // We don't need to do the puts here, but just the deletes at the given timestamps since we +// // just want to completely hide the incorrect entries. +// state.rollback(batch.getKvs()); +// // setup state +// state.setPendingUpdates(batch.getKvs()); +// +// // cleanup the pending batch. If anything in the correct history is covered by Deletes used to +// // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both +// // because the update may have a different set of columns or value based on the update). +// cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, indexMetaData); +// +// // have to roll the state forward again, so the current state is correct +// state.applyPendingUpdates(); +// return true; +// } +// return false; } private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap, IndexMetaData indexMetaData) @@ -308,6 +313,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { if (trackerTs < minTs) { minTs = tracker.getTS(); } + + // FIXME: PHOENIX-4057 do not attempt to issue index updates + // for out-of-order mutations since it corrupts the index. + if (tracker.hasNewerTimestamps()) { + continue; + } + // track index hints for the next round. Hint if we need an update for that column for the // next timestamp. These columns clearly won't need to update as we go through time as they // already match the most recent possible thing. @@ -392,6 +404,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { if (!d.isValid()) { continue; } + // FIXME: PHOENIX-4057 do not attempt to issue index updates + // for out-of-order mutations since it corrupts the index. + final ColumnTracker tracker = d.getIndexedColumns(); + if (tracker.hasNewerTimestamps()) { + continue; + } + // override the timestamps in the delete to match the current batch. Delete remove = (Delete)d.getUpdate(); remove.setTimestamp(ts);