http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java new file mode 100644 index 0000000..127c988 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Properties; + +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.primitives.Doubles; + +@RunWith(Parameterized.class) +public class MutableIndexIT extends BaseHBaseManagedTimeIT { + + protected final boolean localIndex; + private final String tableDDLOptions; + + public MutableIndexIT(boolean localIndex, boolean transactional) { + this.localIndex = localIndex; + StringBuilder optionBuilder = new StringBuilder(); + if (transactional) { + optionBuilder.append("TRANSACTIONAL=true"); + } + this.tableDDLOptions = optionBuilder.toString(); + } + + @Parameters(name="localIndex = {0} , transactional = {1}") + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + { false, false }, { false, true }, { true, false }, { true, true } + }); + } + + @Test + public void testCoveredColumnUpdates() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + // create unique table and index names for each parameterized test + String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis(); + String indexName = "IDX" + "_" + System.currentTimeMillis(); + String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + + createMultiCFTestTable(fullTableName, tableDDLOptions); + populateMultiCFTestTable(fullTableName); + PreparedStatement stmt = conn.prepareStatement("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + + " (char_col1 ASC, int_col1 ASC) INCLUDE (long_col1, long_col2)"); + stmt.execute(); + + String query = "SELECT char_col1, int_col1, long_col2 from " + fullTableName; + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query); + if (localIndex) { + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName +" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); + } else { + assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs)); + } + + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("chara", rs.getString(1)); + assertEquals(2, rs.getInt(2)); + assertEquals(3L, rs.getLong(3)); + assertTrue(rs.next()); + assertEquals("chara", rs.getString(1)); + assertEquals(3, rs.getInt(2)); + assertEquals(4L, rs.getLong(3)); + assertTrue(rs.next()); + assertEquals("chara", rs.getString(1)); + assertEquals(4, rs.getInt(2)); + assertEquals(5L, rs.getLong(3)); + assertFalse(rs.next()); + + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2*2 FROM " + + fullTableName + " WHERE long_col2=?"); + stmt.setLong(1,4L); + assertEquals(1,stmt.executeUpdate()); + conn.commit(); + + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("chara", rs.getString(1)); + assertEquals(2, rs.getInt(2)); + assertEquals(3L, rs.getLong(3)); + assertTrue(rs.next()); + assertEquals("chara", rs.getString(1)); + assertEquals(3, rs.getInt(2)); + assertEquals(8L, rs.getLong(3)); + assertTrue(rs.next()); + assertEquals("chara", rs.getString(1)); + assertEquals(4, rs.getInt(2)); + assertEquals(5L, rs.getLong(3)); + assertFalse(rs.next()); + + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, null FROM " + + fullTableName + " WHERE long_col2=?"); + stmt.setLong(1,3L); + assertEquals(1,stmt.executeUpdate()); + conn.commit(); + + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("chara", rs.getString(1)); + assertEquals(2, rs.getInt(2)); + assertEquals(0, rs.getLong(3)); + assertTrue(rs.wasNull()); + assertTrue(rs.next()); + assertEquals("chara", rs.getString(1)); + assertEquals(3, rs.getInt(2)); + assertEquals(8L, rs.getLong(3)); + assertTrue(rs.next()); + assertEquals("chara", rs.getString(1)); + assertEquals(4, rs.getInt(2)); + assertEquals(5L, rs.getLong(3)); + assertFalse(rs.next()); + if(localIndex) { + query = "SELECT b.* from " + fullTableName + " where int_col1 = 4"; + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName +" [-32768]\n" + + " SERVER FILTER BY TO_INTEGER(\"INT_COL1\") = 4\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("varchar_b", rs.getString(1)); + assertEquals("charb", rs.getString(2)); + assertEquals(5, rs.getInt(3)); + assertEquals(5, rs.getLong(4)); + assertFalse(rs.next()); + + } + } + } + + @Test + public void testCoveredColumns() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + String query; + ResultSet rs; + // create unique table and index names for each parameterized test + String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis(); + String indexName = "IDX" + "_" + System.currentTimeMillis(); + String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); + query = "SELECT * FROM " + fullTableName; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + stmt.setString(1,"a"); + stmt.setString(2, "x"); + stmt.setString(3, "1"); + stmt.execute(); + conn.commit(); + + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("x",rs.getString(1)); + assertEquals("a",rs.getString(2)); + assertEquals("1",rs.getString(3)); + assertFalse(rs.next()); + + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)"); + stmt.setString(1,"a"); + stmt.setString(2, null); + stmt.execute(); + conn.commit(); + + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("x",rs.getString(1)); + assertEquals("a",rs.getString(2)); + assertNull(rs.getString(3)); + assertFalse(rs.next()); + + query = "SELECT * FROM " + fullTableName; + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + if(localIndex) { + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); + } else { + assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs)); + } + + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals("x",rs.getString(2)); + assertNull(rs.getString(3)); + assertFalse(rs.next()); + + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)"); + stmt.setString(1,"a"); + stmt.setString(2,"3"); + stmt.execute(); + conn.commit(); + + query = "SELECT * FROM " + fullTableName; + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + if(localIndex) { + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName + " [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); + } else { + assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs)); + } + + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals("x",rs.getString(2)); + assertEquals("3",rs.getString(3)); + assertFalse(rs.next()); + + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)"); + stmt.setString(1,"a"); + stmt.setString(2,"4"); + stmt.execute(); + conn.commit(); + + query = "SELECT * FROM " + fullTableName; + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + if(localIndex) { + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); + } else { + assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs)); + } + + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals("x",rs.getString(2)); + assertEquals("4",rs.getString(3)); + assertFalse(rs.next()); + } + } + + @Test + public void testCompoundIndexKey() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + String query; + ResultSet rs; + // create unique table and index names for each parameterized test + String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis(); + String indexName = "IDX" + "_" + System.currentTimeMillis(); + String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + + // make sure that the tables are empty, but reachable + conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); + query = "SELECT * FROM " + fullTableName; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + // load some data into the table + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + stmt.setString(1,"a"); + stmt.setString(2, "x"); + stmt.setString(3, "1"); + stmt.execute(); + conn.commit(); + + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("x",rs.getString(1)); + assertEquals("1",rs.getString(2)); + assertEquals("a",rs.getString(3)); + assertFalse(rs.next()); + + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + stmt.setString(1,"a"); + stmt.setString(2, "y"); + stmt.setString(3, null); + stmt.execute(); + conn.commit(); + + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("y",rs.getString(1)); + assertNull(rs.getString(2)); + assertEquals("a",rs.getString(3)); + assertFalse(rs.next()); + + query = "SELECT * FROM " + fullTableName; + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + if (localIndex) { + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); + } else { + assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n" + + " SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs)); + } + //make sure the data table looks like what we expect + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals("y",rs.getString(2)); + assertNull(rs.getString(3)); + assertFalse(rs.next()); + + // Upsert new row with null leading index column + stmt.setString(1,"b"); + stmt.setString(2, null); + stmt.setString(3, "3"); + stmt.execute(); + conn.commit(); + + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals(null,rs.getString(1)); + assertEquals("3",rs.getString(2)); + assertEquals("b",rs.getString(3)); + assertTrue(rs.next()); + assertEquals("y",rs.getString(1)); + assertNull(rs.getString(2)); + assertEquals("a",rs.getString(3)); + assertFalse(rs.next()); + + // Update row with null leading index column to have a value + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?)"); + stmt.setString(1,"b"); + stmt.setString(2, "z"); + stmt.execute(); + conn.commit(); + + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("y",rs.getString(1)); + assertNull(rs.getString(2)); + assertEquals("a",rs.getString(3)); + assertTrue(rs.next()); + assertEquals("z",rs.getString(1)); + assertEquals("3",rs.getString(2)); + assertEquals("b",rs.getString(3)); + assertFalse(rs.next()); + } + + } + + /** + * There was a case where if there were multiple updates to a single row in the same batch, the + * index wouldn't be updated correctly as each element of the batch was evaluated with the state + * previous to the batch, rather than with the rest of the batch. This meant you could do a put + * and a delete on a row in the same batch and the index result would contain the current + put + * and current + delete, but not current + put + delete. + * @throws Exception on failure + */ + @Test + public void testMultipleUpdatesToSingleRow() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + String query; + ResultSet rs; + // create unique table and index names for each parameterized test + String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis(); + String indexName = "IDX" + "_" + System.currentTimeMillis(); + String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + + // make sure that the tables are empty, but reachable + conn.createStatement().execute( + "CREATE TABLE " + fullTableName + + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); + query = "SELECT * FROM " + fullTableName; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + // load some data into the table + PreparedStatement stmt = + conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + stmt.setString(1, "a"); + stmt.setString(2, "x"); + stmt.setString(3, "1"); + stmt.execute(); + conn.commit(); + + // make sure the index is working as expected + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("1", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + // do multiple updates to the same row, in the same batch + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k, v1) VALUES(?,?)"); + stmt.setString(1, "a"); + stmt.setString(2, "y"); + stmt.execute(); + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)"); + stmt.setString(1, "a"); + stmt.setString(2, null); + stmt.execute(); + conn.commit(); + + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("y", rs.getString(1)); + assertNull(rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + query = "SELECT * FROM " + fullTableName; + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + if(localIndex) { + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + "CLIENT MERGE SORT", + QueryUtil.getExplainPlan(rs)); + } else { + assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n" + + " SERVER FILTER BY FIRST KEY ONLY", + QueryUtil.getExplainPlan(rs)); + } + + // check that the data table matches as expected + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertNull(rs.getString(3)); + assertFalse(rs.next()); + } + } + + @Test + public void testUpsertingNullForIndexedColumns() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + ResultSet rs; + // create unique table and index names for each parameterized test + String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis(); + String indexName = "IDX" + "_" + System.currentTimeMillis(); + String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + String fullIndexeName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + fullTableName + "(v1 VARCHAR PRIMARY KEY, v2 DOUBLE, v3 VARCHAR) "+tableDDLOptions); + stmt.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + " (v2) INCLUDE(v3)"); + + //create a row with value null for indexed column v2 + stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', null, 'abc')"); + conn.commit(); + + //assert values in index table + rs = stmt.executeQuery("select * from " + fullIndexeName); + assertTrue(rs.next()); + assertEquals(0, Doubles.compare(0, rs.getDouble(1))); + assertTrue(rs.wasNull()); + assertEquals("cc1", rs.getString(2)); + assertEquals("abc", rs.getString(3)); + assertFalse(rs.next()); + + //assert values in data table + rs = stmt.executeQuery("select v1, v2, v3 from " + fullTableName); + assertTrue(rs.next()); + assertEquals("cc1", rs.getString(1)); + assertEquals(0, Doubles.compare(0, rs.getDouble(2))); + assertTrue(rs.wasNull()); + assertEquals("abc", rs.getString(3)); + assertFalse(rs.next()); + + //update the previously null value for indexed column v2 to a non-null value 1.23 + stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', 1.23, 'abc')"); + conn.commit(); + + //assert values in data table + rs = stmt.executeQuery("select /*+ NO_INDEX */ v1, v2, v3 from " + fullTableName); + assertTrue(rs.next()); + assertEquals("cc1", rs.getString(1)); + assertEquals(0, Doubles.compare(1.23, rs.getDouble(2))); + assertEquals("abc", rs.getString(3)); + assertFalse(rs.next()); + + //assert values in index table + rs = stmt.executeQuery("select * from " + fullIndexeName); + assertTrue(rs.next()); + assertEquals(0, Doubles.compare(1.23, rs.getDouble(1))); + assertEquals("cc1", rs.getString(2)); + assertEquals("abc", rs.getString(3)); + assertFalse(rs.next()); + + //update the value for indexed column v2 back to null + stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', null, 'abc')"); + conn.commit(); + + //assert values in index table + rs = stmt.executeQuery("select * from " + fullIndexeName); + assertTrue(rs.next()); + assertEquals(0, Doubles.compare(0, rs.getDouble(1))); + assertTrue(rs.wasNull()); + assertEquals("cc1", rs.getString(2)); + assertEquals("abc", rs.getString(3)); + assertFalse(rs.next()); + + //assert values in data table + rs = stmt.executeQuery("select v1, v2, v3 from " + fullTableName); + assertTrue(rs.next()); + assertEquals("cc1", rs.getString(1)); + assertEquals(0, Doubles.compare(0, rs.getDouble(2))); + assertEquals("abc", rs.getString(3)); + assertFalse(rs.next()); + } + } + + + private void assertImmutableRows(Connection conn, String fullTableName, boolean expectedValue) throws SQLException { + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows()); + } + + @Test + public void testAlterTableWithImmutability() throws Exception { + String query; + ResultSet rs; + String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis(); + String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + conn.createStatement().execute( + "CREATE TABLE " + fullTableName +" (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR) " + tableDDLOptions); + + query = "SELECT * FROM " + fullTableName; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + assertImmutableRows(conn,fullTableName, false); + conn.createStatement().execute("ALTER TABLE " + fullTableName +" SET IMMUTABLE_ROWS=true"); + assertImmutableRows(conn,fullTableName, true); + + + conn.createStatement().execute("ALTER TABLE " + fullTableName +" SET immutable_rows=false"); + assertImmutableRows(conn,fullTableName, false); + } + } + +}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java deleted file mode 100644 index 5196b0a..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end.index; - -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; -import java.util.Map; -import java.util.Properties; - -import org.apache.phoenix.end2end.Shadower; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.google.common.collect.Maps; - -public class TxGlobalMutableIndexIT extends GlobalMutableIndexIT { - - @BeforeClass - @Shadower(classBeingShadowed = BaseMutableIndexIT.class) - public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(3); - // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan - // Forces server cache to be used - props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2)); - props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); - props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true)); - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - } - - @Test - public void testRollbackOfUncommittedIndexChange() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - try { - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 VARCHAR, v3 VARCHAR)"); - stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)"); - - stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')"); - - //assert values in data table - ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO"); - assertTrue(rs.next()); - assertEquals("x", rs.getString(1)); - assertEquals("y", rs.getString(2)); - assertEquals("a", rs.getString(3)); - assertFalse(rs.next()); - - conn.rollback(); - - //assert values in data table - rs = stmt.executeQuery("select v1, v2, v3 from DEMO"); - assertFalse(rs.next()); - - } finally { - conn.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java deleted file mode 100644 index e9d685f..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end.index; - -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; -import java.util.Map; -import java.util.Properties; - -import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; -import org.apache.phoenix.end2end.Shadower; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.google.common.collect.Maps; - -public class TxImmutableIndexIT extends ImmutableIndexIT { - - @BeforeClass - @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) - public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(3); - // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan - // Forces server cache to be used - props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true)); - // We need this b/c we don't allow a transactional table to be created if the underlying - // HBase table already exists (since we don't know if it was transactional before). - props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - } - - // TODO: need test case with mix of mutable and immutable indexes - @Test - public void testRollbackOfUncommittedKeyValueIndexChange() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - try { - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 VARCHAR, v3 VARCHAR) IMMUTABLE_ROWS=true"); - stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)"); - - stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')"); - - //assert values in data table - ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO"); - assertTrue(rs.next()); - assertEquals("x", rs.getString(1)); - assertEquals("y", rs.getString(2)); - assertEquals("a", rs.getString(3)); - assertFalse(rs.next()); - - conn.rollback(); - - //assert values in data table - rs = stmt.executeQuery("select v1, v2, v3 from DEMO"); - assertFalse(rs.next()); - - } finally { - conn.close(); - } - } - - // TODO: need test case with mix of mutable and immutable indexes - @Test - public void testRollbackOfUncommittedRowKeyIndexChange() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - try { - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE DEMO(v1 VARCHAR, v2 VARCHAR, v3 VARCHAR, CONSTRAINT pk PRIMARY KEY (v1, v2)) IMMUTABLE_ROWS=true"); - stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2, v1)"); - - stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')"); - - //assert values in data table - ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO"); - assertTrue(rs.next()); - assertEquals("x", rs.getString(1)); - assertEquals("y", rs.getString(2)); - assertEquals("a", rs.getString(3)); - assertFalse(rs.next()); - - conn.rollback(); - - //assert values in data table - rs = stmt.executeQuery("select v1, v2, v3 from DEMO"); - assertFalse(rs.next()); - - } finally { - conn.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java new file mode 100644 index 0000000..4131c2d --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java @@ -0,0 +1,260 @@ +package org.apache.phoenix.end2end.index.txn; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.end2end.Shadower; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Maps; + +@RunWith(Parameterized.class) +public class MutableRollbackIT extends BaseHBaseManagedTimeIT { + + private final boolean localIndex; + + public MutableRollbackIT(boolean localIndex) { + this.localIndex = localIndex; + } + + @BeforeClass + @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(2); + props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true)); + // We need this b/c we don't allow a transactional table to be created if the underlying + // HBase table already exists (since we don't know if it was transactional before). + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Parameters(name="localIndex = {0}") + public static Collection<Boolean> data() { + return Arrays.asList(new Boolean[] { + false, true + }); + } + + @Test + public void testRollbackOfUncommittedExistingKeyValueIndexUpdate() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + try { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE DEMO1(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + stmt.execute("CREATE TABLE DEMO2(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true"); + stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO1_idx ON DEMO1 (v1) INCLUDE(v2)"); + stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO2_idx ON DEMO2 (v1) INCLUDE(v2)"); + + stmt.executeUpdate("upsert into DEMO1 values('x', 'y', 'a')"); + conn.commit(); + + //assert rows exists in DEMO1 + ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + //assert rows exists in DEMO1_idx + rs = stmt.executeQuery("select k, v1, v2 from DEMO1 ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + //assert no rows exists in DEMO2 + rs = stmt.executeQuery("select k, v1, v2 from DEMO2"); + assertFalse(rs.next()); + + //assert no rows exists in DEMO2_idx + rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1"); + assertFalse(rs.next()); + + stmt.executeUpdate("upsert into DEMO1 values('x', 'y', 'b')"); + stmt.executeUpdate("upsert into DEMO2 values('a', 'b', 'c')"); + + //assert new covered column value + rs = stmt.executeQuery("select k, v1, v2 from DEMO1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("b", rs.getString(3)); + assertFalse(rs.next()); + + //assert new covered column value + rs = stmt.executeQuery("select k, v1, v2 from DEMO1 ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("b", rs.getString(3)); + assertFalse(rs.next()); + + //assert rows exists in DEMO2 + rs = stmt.executeQuery("select k, v1, v2 from DEMO2"); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("b", rs.getString(2)); + assertEquals("c", rs.getString(3)); + assertFalse(rs.next()); + + //assert rows exists in DEMO2 index table + rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("b", rs.getString(2)); + assertFalse(rs.next()); + + conn.rollback(); + + //assert original row exists in DEMO1 + rs = stmt.executeQuery("select k, v1, v2 from DEMO1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + //assert original row exists in DEMO1_idx + rs = stmt.executeQuery("select k, v1, v2 from DEMO1 ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + //assert no rows exists in DEMO2 + rs = stmt.executeQuery("select k, v1, v2 from DEMO2"); + assertFalse(rs.next()); + + //assert no rows exists in DEMO2_idx + rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1"); + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + + @Test + public void testRollbackOfUncommittedExistingRowKeyIndexUpdate() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + try { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE DEMO1(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + stmt.execute("CREATE TABLE DEMO2(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true"); + stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO1_idx ON DEMO1 (v1, k)"); + stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO2_idx ON DEMO2 (v1, k)"); + + stmt.executeUpdate("upsert into DEMO1 values('x', 'y', 'a')"); + conn.commit(); + + //assert rows exists in DEMO1 + ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + //assert rows exists in DEMO1_idx + rs = stmt.executeQuery("select k, v1 from DEMO1 ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertFalse(rs.next()); + + //assert no rows exists in DEMO2 + rs = stmt.executeQuery("select k, v1, v2 from DEMO2"); + assertFalse(rs.next()); + + //assert no rows exists in DEMO2_idx + rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1"); + assertFalse(rs.next()); + + stmt.executeUpdate("upsert into DEMO1 values('x', 'z', 'a')"); + stmt.executeUpdate("upsert into DEMO2 values('a', 'b', 'c')"); + + //assert new covered row key value exists in DEMO1 + rs = stmt.executeQuery("select k, v1, v2 from DEMO1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("z", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + //assert new covered row key value exists in DEMO1_idx + rs = stmt.executeQuery("select k, v1 from DEMO1 ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("z", rs.getString(2)); + assertFalse(rs.next()); + + //assert rows exists in DEMO2 + rs = stmt.executeQuery("select k, v1, v2 from DEMO2"); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("b", rs.getString(2)); + assertEquals("c", rs.getString(3)); + assertFalse(rs.next()); + + //assert rows exists in DEMO2 index table + rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("b", rs.getString(2)); + assertFalse(rs.next()); + + conn.rollback(); + + //assert original row exists in DEMO1 + rs = stmt.executeQuery("select k, v1, v2 from DEMO1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + //assert original row exists in DEMO1_idx + rs = stmt.executeQuery("select k, v1 from DEMO1 ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertFalse(rs.next()); + + //assert no rows exists in DEMO2 + rs = stmt.executeQuery("select k, v1, v2 from DEMO2"); + assertFalse(rs.next()); + + //assert no rows exists in DEMO2_idx + rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1"); + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java new file mode 100644 index 0000000..b7ec3c6 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java @@ -0,0 +1,144 @@ +package org.apache.phoenix.end2end.index.txn; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.end2end.Shadower; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Maps; + +@RunWith(Parameterized.class) +public class RollbackIT extends BaseHBaseManagedTimeIT { + + private final boolean localIndex; + private final boolean mutable; + + public RollbackIT(boolean localIndex, boolean mutable) { + this.localIndex = localIndex; + this.mutable = mutable; + } + + @BeforeClass + @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(2); + props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true)); + // We need this b/c we don't allow a transactional table to be created if the underlying + // HBase table already exists (since we don't know if it was transactional before). + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Parameters(name="localIndex = {0} , mutable = {1}") + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + { false, false }, { false, true }, { true, false }, { true, true } + }); + } + + @Test + public void testRollbackOfUncommittedKeyValueIndexInsert() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + try { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE DEMO(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX DEMO_idx ON DEMO (v1) INCLUDE(v2)"); + + stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')"); + + //assert values in data table + ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + //assert values in index table + rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + conn.rollback(); + + //assert values in data table + rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k"); + assertFalse(rs.next()); + + //assert values in index table + rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1"); + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + + @Test + public void testRollbackOfUncommittedRowKeyIndexInsert() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + try { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE DEMO(k VARCHAR, v1 VARCHAR, v2 VARCHAR, CONSTRAINT pk PRIMARY KEY (v1, v2))"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX DEMO_idx ON DEMO (v1, k)"); + + stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')"); + + ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1"); + + //assert values in data table + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + //assert values in index table + rs = stmt.executeQuery("select k, v1 from DEMO ORDER BY v2"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertFalse(rs.next()); + + conn.rollback(); + + //assert values in data table + rs = stmt.executeQuery("select k, v1, v2 from DEMO"); + assertFalse(rs.next()); + + //assert values in index table + rs = stmt.executeQuery("select k, v1 from DEMO ORDER BY v2"); + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + +} + http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java new file mode 100644 index 0000000..205056b --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java @@ -0,0 +1,198 @@ +package org.apache.phoenix.end2end.index.txn; + +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; +import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; +import static org.apache.phoenix.util.TestUtil.LOCALHOST; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.Indexer; +import org.apache.phoenix.jdbc.PhoenixTestDriver; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Maps; + +@RunWith(Parameterized.class) +public class TxWriteFailureIT extends BaseTest { + + private static PhoenixTestDriver driver; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final String SCHEMA_NAME = "S"; + private static final String DATA_TABLE_NAME = "T"; + private static final String INDEX_TABLE_NAME = "I"; + private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, DATA_TABLE_NAME); + private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, INDEX_TABLE_NAME); + private static final String ROW_TO_FAIL = "fail"; + + private final boolean localIndex; + private final boolean mutable; + + public TxWriteFailureIT(boolean localIndex, boolean mutable) { + this.localIndex = localIndex; + this.mutable = mutable; + } + + @BeforeClass + public static void setupCluster() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + setUpConfigForMiniCluster(conf); + conf.setClass("hbase.coprocessor.region.classes", FailingRegionObserver.class, RegionObserver.class); + conf.setBoolean("hbase.coprocessor.abortonerror", false); + conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false); + TEST_UTIL.startMiniCluster(); + String clientPort = TEST_UTIL.getConfiguration().get( + QueryServices.ZOOKEEPER_PORT_ATTRIB); + url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + + JDBC_PROTOCOL_SEPARATOR + clientPort + + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM; + + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + // Must update config before starting server + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true)); + driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator())); + clusterInitialized = true; + setupTxManager(); + } + + @Parameters(name="localIndex = {0} , mutable = {1}") + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + { false, false }, { false, true }, { true, false }, { true, true } + }); + } + + @Test + public void testIndexTableWriteFailure() throws Exception { + helpTestWriteFailure(true); + } + + @Test + public void testDataTableWriteFailure() throws Exception { + helpTestWriteFailure(false); + } + + private void helpTestWriteFailure(boolean indexTableWriteFailure) throws SQLException { + ResultSet rs; + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = driver.connect(url, props); + conn.setAutoCommit(false); + conn.createStatement().execute( + "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR PRIMARY KEY, v1 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + conn.createStatement().execute( + "CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1)"); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?)"); + // to create a data table write failure set k as the ROW_TO_FAIL, to create an index table write failure set v1 as the ROW_TO_FAIL, + // FailingRegionObserver will throw an exception if the put contains ROW_TO_FAIL + stmt.setString(1, !indexTableWriteFailure ? ROW_TO_FAIL : "k1"); + stmt.setString(2, indexTableWriteFailure ? ROW_TO_FAIL : "k2"); + stmt.execute(); + stmt.setString(1, "k2"); + stmt.setString(2, "v2"); + stmt.execute(); + try { + conn.commit(); + fail(); + } + catch (Exception e) { + conn.rollback(); + } + stmt.setString(1, "k3"); + stmt.setString(2, "v3"); + stmt.execute(); + //this should pass + conn.commit(); + + // verify that only k3,v3 exists in the data table + String dataSql = "SELECT k, v1 FROM " + DATA_TABLE_FULL_NAME + " order by k"; + rs = conn.createStatement().executeQuery("EXPLAIN "+dataSql); + assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER S.T", + QueryUtil.getExplainPlan(rs)); + rs = conn.createStatement().executeQuery(dataSql); + assertTrue(rs.next()); + assertEquals("k3", rs.getString(1)); + assertEquals("v3", rs.getString(2)); + assertFalse(rs.next()); + + // verify the only k3,v3 exists in the index table + String indexSql = "SELECT k, v1 FROM " + DATA_TABLE_FULL_NAME + " order by v1"; + rs = conn.createStatement().executeQuery("EXPLAIN "+indexSql); + if(localIndex) { + assertEquals( + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + DATA_TABLE_FULL_NAME + " [-32768]\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + "CLIENT MERGE SORT", + QueryUtil.getExplainPlan(rs)); + } else { + assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + INDEX_TABLE_FULL_NAME + "\n SERVER FILTER BY FIRST KEY ONLY", + QueryUtil.getExplainPlan(rs)); + } + rs = conn.createStatement().executeQuery(indexSql); + assertTrue(rs.next()); + assertEquals("k3", rs.getString(1)); + assertEquals("v3", rs.getString(2)); + assertFalse(rs.next()); + + conn.createStatement().execute("DROP TABLE " + DATA_TABLE_FULL_NAME); + } + + + public static class FailingRegionObserver extends SimpleRegionObserver { + @Override + public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, + final Durability durability) throws HBaseIOException { + if (shouldFailUpsert(c, put)) { + // throwing anything other than instances of IOException result + // in this coprocessor being unloaded + // DoNotRetryIOException tells HBase not to retry this mutation + // multiple times + throw new DoNotRetryIOException(); + } + } + + private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) { + return Bytes.contains(put.getRow(), Bytes.toBytes(ROW_TO_FAIL)); + } + + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 38e8ae7..52b5b5f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -10,6 +10,7 @@ package org.apache.phoenix.tx; import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -21,15 +22,18 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.Map; +import java.util.Properties; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.Shadower; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.util.DateUtil; +import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.TestUtil; import org.junit.Before; @@ -46,13 +50,6 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { public void setUp() throws SQLException { ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE); } - - @BeforeClass - @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) - public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(3); - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - } @Test public void testReadOwnWrites() throws Exception { @@ -248,4 +245,5 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " SET IMMUTABLE_ROWS=true"); testRowConflicts(); } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java index 9fa69de..63d4851 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java @@ -26,21 +26,59 @@ import static org.junit.Assert.assertTrue; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; import java.util.Properties; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.end2end.Shadower; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Maps; import co.cask.tephra.Transaction.VisibilityLevel; +@RunWith(Parameterized.class) public class TxCheckpointIT extends BaseHBaseManagedTimeIT { + + private final boolean localIndex; + private final boolean mutable; + + public TxCheckpointIT(boolean localIndex, boolean mutable) { + this.localIndex = localIndex; + this.mutable = mutable; + } + + @BeforeClass + @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(2); + props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true)); + // We need this b/c we don't allow a transactional table to be created if the underlying + // HBase table already exists (since we don't know if it was transactional before). + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Parameters(name="localIndex = {0} , mutable = {1}") + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + { false, false }, { false, true }, { true, false }, { true, true } + }); + } - @Test public void testUpsertSelectDoesntSeeUpsertedData() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -50,7 +88,8 @@ public class TxCheckpointIT extends BaseHBaseManagedTimeIT { Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(true); conn.createStatement().execute("CREATE SEQUENCE keys"); - conn.createStatement().execute("CREATE TABLE txfoo (pk INTEGER PRIMARY KEY, val INTEGER) TRANSACTIONAL=true"); + conn.createStatement().execute("CREATE TABLE txfoo (pk INTEGER PRIMARY KEY, val INTEGER)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + conn.createStatement().execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX idx ON txfoo (val)"); conn.createStatement().execute("UPSERT INTO txfoo VALUES (NEXT VALUE FOR keys,1)"); for (int i=0; i<6; i++) { @@ -62,150 +101,239 @@ public class TxCheckpointIT extends BaseHBaseManagedTimeIT { } @Test - public void testCheckpointForUpsertSelect() throws Exception { - ResultSet rs; + public void testRollbackOfUncommittedDelete() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); - conn.createStatement().execute("create table tx1 (id bigint not null primary key) TRANSACTIONAL=true"); - conn.createStatement().execute("create table tx2 (id bigint not null primary key) TRANSACTIONAL=true"); - - conn.createStatement().execute("upsert into tx1 values (1)"); - conn.createStatement().execute("upsert into tx1 values (2)"); - conn.createStatement().execute("upsert into tx1 values (3)"); - conn.commit(); - - MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState(); - state.startTransaction(); - long wp = state.getWritePointer(); - conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1"); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); - assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move - rs = conn.createStatement().executeQuery("select max(id) from tx1"); - - assertTrue(rs.next()); - assertEquals(4,rs.getLong(1)); - assertFalse(rs.next()); - - conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1"); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); - assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moves - wp = state.getWritePointer(); - - conn.createStatement().execute("upsert into tx1 select id from tx2"); - assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel()); - // Write ptr shouldn't move b/c we're not reading from a table with uncommitted data - assertEquals(wp, state.getWritePointer()); - - rs = conn.createStatement().executeQuery("select max(id) from tx1"); - - assertTrue(rs.next()); - assertEquals(5,rs.getLong(1)); - assertFalse(rs.next()); - - conn.rollback(); - - rs = conn.createStatement().executeQuery("select max(id) from tx1"); - - assertTrue(rs.next()); - assertEquals(3,rs.getLong(1)); - assertFalse(rs.next()); - - wp = state.getWritePointer(); - conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1"); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); - assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move - rs = conn.createStatement().executeQuery("select max(id) from tx1"); - - assertTrue(rs.next()); - assertEquals(4,rs.getLong(1)); - assertFalse(rs.next()); - - conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1"); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); - assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moves - rs = conn.createStatement().executeQuery("select max(id) from tx1"); - - assertTrue(rs.next()); - assertEquals(5,rs.getLong(1)); - assertFalse(rs.next()); - - conn.commit(); - - rs = conn.createStatement().executeQuery("select max(id) from tx1"); - - assertTrue(rs.next()); - assertEquals(5,rs.getLong(1)); - assertFalse(rs.next()); - } + conn.setAutoCommit(false); + try { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE DEMO(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX DEMO_idx ON DEMO (v1) INCLUDE(v2)"); + + stmt.executeUpdate("upsert into DEMO values('x1', 'y1', 'a1')"); + stmt.executeUpdate("upsert into DEMO values('x2', 'y2', 'a2')"); + + //assert values in data table + ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k"); + assertTrue(rs.next()); + assertEquals("x1", rs.getString(1)); + assertEquals("y1", rs.getString(2)); + assertEquals("a1", rs.getString(3)); + assertTrue(rs.next()); + assertEquals("x2", rs.getString(1)); + assertEquals("y2", rs.getString(2)); + assertEquals("a2", rs.getString(3)); + assertFalse(rs.next()); + + //assert values in index table + rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("x1", rs.getString(1)); + assertEquals("y1", rs.getString(2)); + assertEquals("a1", rs.getString(3)); + assertTrue(rs.next()); + assertEquals("x2", rs.getString(1)); + assertEquals("y2", rs.getString(2)); + assertEquals("a2", rs.getString(3)); + assertFalse(rs.next()); + + conn.commit(); + + stmt.executeUpdate("DELETE FROM DEMO WHERE k='x1' AND v1='y1' AND v2='a1'"); + //assert row is delete in data table + rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k"); + assertTrue(rs.next()); + assertEquals("x2", rs.getString(1)); + assertEquals("y2", rs.getString(2)); + assertEquals("a2", rs.getString(3)); + assertFalse(rs.next()); + + //assert row is delete in index table + rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("x2", rs.getString(1)); + assertEquals("y2", rs.getString(2)); + assertEquals("a2", rs.getString(3)); + assertFalse(rs.next()); + + conn.rollback(); + + //assert two rows in data table + rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k"); + assertTrue(rs.next()); + assertEquals("x1", rs.getString(1)); + assertEquals("y1", rs.getString(2)); + assertEquals("a1", rs.getString(3)); + assertTrue(rs.next()); + assertEquals("x2", rs.getString(1)); + assertEquals("y2", rs.getString(2)); + assertEquals("a2", rs.getString(3)); + assertFalse(rs.next()); + + //assert two rows in index table + rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1"); + assertTrue(rs.next()); + assertEquals("x1", rs.getString(1)); + assertEquals("y1", rs.getString(2)); + assertEquals("a1", rs.getString(3)); + assertTrue(rs.next()); + assertEquals("x2", rs.getString(1)); + assertEquals("y2", rs.getString(2)); + assertEquals("a2", rs.getString(3)); + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + + @Test + public void testCheckpointForUpsertSelect() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props);) { + conn.setAutoCommit(false); + Statement stmt = conn.createStatement(); - @Test + stmt.execute("CREATE TABLE DEMO(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + + (!mutable ? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + + "INDEX IDX ON DEMO (v1) INCLUDE(v2)"); + + stmt.executeUpdate("upsert into DEMO values(1, 'a2', 'b1')"); + stmt.executeUpdate("upsert into DEMO values(2, 'a2', 'b2')"); + stmt.executeUpdate("upsert into DEMO values(3, 'a3', 'b3')"); + conn.commit(); + + upsertRows(conn); + conn.rollback(); + verifyRows(conn, 3); + + upsertRows(conn); + conn.commit(); + verifyRows(conn, 6); + } + } + + private void verifyRows(Connection conn, int expectedMaxId) throws SQLException { + ResultSet rs; + //query the data table + rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ max(id) from DEMO"); + assertTrue(rs.next()); + assertEquals(expectedMaxId, rs.getLong(1)); + assertFalse(rs.next()); + + // query the index + rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ max(id) from DEMO"); + assertTrue(rs.next()); + assertEquals(expectedMaxId, rs.getLong(1)); + assertFalse(rs.next()); + } + + private void upsertRows(Connection conn) throws SQLException { + ResultSet rs; + MutationState state = conn.unwrap(PhoenixConnection.class) + .getMutationState(); + state.startTransaction(); + long wp = state.getWritePointer(); + conn.createStatement().execute( + "upsert into DEMO select max(id)+1, 'a4', 'b4' from DEMO"); + assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, + state.getVisibilityLevel()); + assertEquals(wp, state.getWritePointer()); // Make sure write ptr + // didn't move + rs = conn.createStatement().executeQuery("select max(id) from DEMO"); + + assertTrue(rs.next()); + assertEquals(4, rs.getLong(1)); + assertFalse(rs.next()); + + conn.createStatement().execute( + "upsert into DEMO select max(id)+1, 'a5', 'b5' from DEMO"); + assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, + state.getVisibilityLevel()); + assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr + // moves + wp = state.getWritePointer(); + rs = conn.createStatement().executeQuery("select max(id) from DEMO"); + + assertTrue(rs.next()); + assertEquals(5, rs.getLong(1)); + assertFalse(rs.next()); + + conn.createStatement().execute( + "upsert into DEMO select max(id)+1, 'a6', 'b6' from DEMO"); + assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, + state.getVisibilityLevel()); + assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr + // moves + wp = state.getWritePointer(); + rs = conn.createStatement().executeQuery("select max(id) from DEMO"); + + assertTrue(rs.next()); + assertEquals(6, rs.getLong(1)); + assertFalse(rs.next()); + } + + @Test public void testCheckpointForDelete() throws Exception { - ResultSet rs; - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.createStatement().execute("create table tx3 (id1 bigint primary key, fk1 integer) TRANSACTIONAL=true"); - conn.createStatement().execute("create table tx4 (id2 bigint primary key, fk2 integer) TRANSACTIONAL=true"); - - conn.createStatement().execute("upsert into tx3 values (1, 3)"); - conn.createStatement().execute("upsert into tx3 values (2, 2)"); - conn.createStatement().execute("upsert into tx3 values (3, 1)"); - conn.createStatement().execute("upsert into tx4 values (1, 1)"); - conn.commit(); - - MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState(); - state.startTransaction(); - long wp = state.getWritePointer(); - conn.createStatement().execute("delete from tx3 where id1=fk1"); - assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel()); - assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move - - rs = conn.createStatement().executeQuery("select id1 from tx3"); - assertTrue(rs.next()); - assertEquals(1,rs.getLong(1)); - assertTrue(rs.next()); - assertEquals(3,rs.getLong(1)); - assertFalse(rs.next()); - - conn.createStatement().execute("delete from tx3 where id1 in (select fk1 from tx3 join tx4 on (fk2=id1))"); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); - assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved - - rs = conn.createStatement().executeQuery("select id1 from tx3"); - assertTrue(rs.next()); - assertEquals(1,rs.getLong(1)); - assertFalse(rs.next()); - - /* - * TODO: file Tephra JIRA, as this fails with an NPE because the - * ActionChange has a null family since we're issuing row deletes. - * See this code in TransactionAwareHTable.transactionalizeAction(Delete) - * and try modifying addToChangeSet(deleteRow, null, null); - * to modifying addToChangeSet(deleteRow, family, null); - } else { - for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) { - byte[] family = familyEntry.getKey(); - List<Cell> entries = familyEntry.getValue(); - boolean isFamilyDelete = false; - if (entries.size() == 1) { - Cell cell = entries.get(0); - isFamilyDelete = CellUtil.isDeleteFamily(cell); - } - if (isFamilyDelete) { - if (conflictLevel == TxConstants.ConflictDetection.ROW || - conflictLevel == TxConstants.ConflictDetection.NONE) { - // no need to identify individual columns deleted - txDelete.deleteFamily(family); - addToChangeSet(deleteRow, null, null); - */ -// conn.rollback(); -// rs = conn.createStatement().executeQuery("select id1 from tx3"); -// assertTrue(rs.next()); -// assertEquals(1,rs.getLong(1)); -// assertTrue(rs.next()); -// assertEquals(2,rs.getLong(1)); -// assertTrue(rs.next()); -// assertEquals(3,rs.getLong(1)); -// assertFalse(rs.next()); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + ResultSet rs; + try (Connection conn = DriverManager.getConnection(getUrl(), props);) { + conn.setAutoCommit(false); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE DEMO1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" + + (!mutable ? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE DEMO2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" + + (!mutable ? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + + "INDEX IDX ON DEMO1 (FK1B)"); + + stmt.executeUpdate("upsert into DEMO1 values (1, 3, 3)"); + stmt.executeUpdate("upsert into DEMO1 values (2, 2, 2)"); + stmt.executeUpdate("upsert into DEMO1 values (3, 1, 1)"); + stmt.executeUpdate("upsert into DEMO2 values (1, 1)"); + conn.commit(); + MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState(); + state.startTransaction(); + long wp = state.getWritePointer(); + conn.createStatement().execute("delete from DEMO1 where id1=fk1b AND fk1b=id1"); + assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel()); + assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move + + rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from DEMO1"); + assertTrue(rs.next()); + assertEquals(1,rs.getLong(1)); + assertTrue(rs.next()); + assertEquals(3,rs.getLong(1)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from DEMO1"); + assertTrue(rs.next()); + assertEquals(3,rs.getLong(1)); + assertTrue(rs.next()); + assertEquals(1,rs.getLong(1)); + assertFalse(rs.next()); + + conn.createStatement().execute("delete from DEMO1 where id1 in (select fk1a from DEMO1 join DEMO2 on (fk2=id1))"); + assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); + assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved + + rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from DEMO1"); + assertTrue(rs.next()); + assertEquals(1,rs.getLong(1)); + assertFalse(rs.next()); + + conn.rollback(); + rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from DEMO1"); + assertTrue(rs.next()); + assertEquals(1,rs.getLong(1)); + assertTrue(rs.next()); + assertEquals(2,rs.getLong(1)); + assertTrue(rs.next()); + assertEquals(3,rs.getLong(1)); + assertFalse(rs.next()); + } } + + }
