http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java index 410dca5..b76d61d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java @@ -61,8 +61,8 @@ import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.util.DateUtil; @@ -85,27 +85,39 @@ public class IndexIT extends ParallelStatsDisabledIT { private final boolean mutable; private final String tableDDLOptions; - - public IndexIT(boolean localIndex, boolean mutable, boolean transactional) { + public IndexIT(boolean localIndex, boolean mutable, boolean transactional, boolean columnEncoded) { this.localIndex = localIndex; this.transactional = transactional; this.mutable = mutable; StringBuilder optionBuilder = new StringBuilder(); - if (!mutable) - optionBuilder.append(" IMMUTABLE_ROWS=true "); + if (!columnEncoded) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + } + if (!mutable) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("IMMUTABLE_ROWS=true"); + if (!columnEncoded) { + optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); + } + } if (transactional) { - if (!(optionBuilder.length()==0)) + if (optionBuilder.length()!=0) optionBuilder.append(","); optionBuilder.append(" TRANSACTIONAL=true "); } this.tableDDLOptions = optionBuilder.toString(); } - @Parameters(name="IndexIT_localIndex={0},mutable={1},transactional={2}") // name is used by failsafe as file name in reports + @Parameters(name="IndexIT_localIndex={0},mutable={1},transactional={2},columnEncoded={3}") // name is used by failsafe as file name in reports public static Collection<Boolean[]> data() { return Arrays.asList(new Boolean[][] { - { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true }, - { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true } + { false, false, false, false }, { false, false, false, true }, { false, false, true, false }, { false, false, true, true }, + { false, true, false, false }, { false, true, false, true }, { false, true, true, false }, { false, true, true, true }, + { true, false, false, false }, { true, false, false, true }, { true, false, true, false }, { true, false, true, true }, + { true, true, false, false }, { true, true, false, true }, { true, true, true, false }, { true, true, true, true } }); } @@ -780,7 +792,7 @@ public class IndexIT extends ParallelStatsDisabledIT { conn.createStatement().execute( "CREATE TABLE " + testTable + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " - + (!tableDDLOptions.isEmpty() ? tableDDLOptions : "") + "SPLIT ON ('b')"); + + (!tableDDLOptions.isEmpty() ? tableDDLOptions : "") + " SPLIT ON ('b')"); query = "SELECT * FROM " + testTable; rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); @@ -808,23 +820,23 @@ public class IndexIT extends ParallelStatsDisabledIT { stmt.execute(); conn.commit(); - // make sure the index is working as expected - query = "SELECT * FROM " + fullIndexName; + query = "SELECT /*+ NO_INDEX */ * FROM " + testTable; rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); - assertEquals("x", rs.getString(1)); - assertEquals("1", rs.getString(2)); - assertEquals("a", rs.getString(3)); + assertEquals("a", rs.getString(1)); + assertEquals("x", rs.getString(2)); + assertEquals("1", rs.getString(3)); assertTrue(rs.next()); - assertEquals("y", rs.getString(1)); - assertEquals("2", rs.getString(2)); - assertEquals("b", rs.getString(3)); + assertEquals("b", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("2", rs.getString(3)); assertTrue(rs.next()); - assertEquals("z", rs.getString(1)); - assertEquals("3", rs.getString(2)); - assertEquals("c", rs.getString(3)); + assertEquals("c", rs.getString(1)); + assertEquals("z", rs.getString(2)); + assertEquals("3", rs.getString(3)); assertFalse(rs.next()); - + + // make sure the index is working as expected query = "SELECT * FROM " + testTable; rs = conn.createStatement().executeQuery("EXPLAIN " + query); if (localIndex) { @@ -897,7 +909,7 @@ public class IndexIT extends ParallelStatsDisabledIT { } else { assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullIndexName + " ['1']", QueryUtil.getExplainPlan(rs)); } - + rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); assertEquals("a",rs.getString(1));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java index e854f23..fb9776e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; @@ -48,6 +47,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; @@ -115,7 +115,7 @@ public class IndexTestUtil { while ((hasValue = dataRowKeySchema.next(ptr, i, maxOffset)) != null) { if (hasValue) { PColumn dataColumn = dataPKColumns.get(i); - PColumn indexColumn = indexTable.getColumn(IndexUtil.getIndexColumnName(dataColumn)); + PColumn indexColumn = indexTable.getColumnForColumnName(IndexUtil.getIndexColumnName(dataColumn)); coerceDataValueToIndexValue(dataColumn, indexColumn, ptr); indexValues[indexColumn.getPosition()-indexOffset] = ptr.copyBytes(); } @@ -135,10 +135,11 @@ public class IndexTestUtil { for (Cell kv : entry.getValue()) { @SuppressWarnings("deprecation") byte[] cq = kv.getQualifier(); - if (Bytes.compareTo(QueryConstants.EMPTY_COLUMN_BYTES, cq) != 0) { + byte[] emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(dataTable).getFirst(); + if (Bytes.compareTo(emptyKVQualifier, cq) != 0) { try { - PColumn dataColumn = family.getColumn(cq); - PColumn indexColumn = indexTable.getColumn(IndexUtil.getIndexColumnName(family.getName().getString(), dataColumn.getName().getString())); + PColumn dataColumn = family.getPColumnForColumnQualifier(cq); + PColumn indexColumn = indexTable.getColumnForColumnName(IndexUtil.getIndexColumnName(family.getName().getString(), dataColumn.getName().getString())); ptr.set(kv.getValueArray(),kv.getValueOffset(),kv.getValueLength()); coerceDataValueToIndexValue(dataColumn, indexColumn, ptr); indexValues[indexPKColumns.indexOf(indexColumn)-indexOffset] = ptr.copyBytes(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index 1bfd8fb..7f289bf 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -284,6 +284,8 @@ public class MutableIndexFailureIT extends BaseTest { // verify index table has correct data validateDataWithIndex(conn, fullTableName, fullIndexName); validateDataWithIndex(conn, secondTableName, secondFullIndexName); + } finally { + FAIL_WRITE = false; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index a8e1ecd..19de769 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -54,6 +54,7 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -67,12 +68,17 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { protected final boolean localIndex; private final String tableDDLOptions; - public MutableIndexIT(boolean localIndex, boolean transactional) { + public MutableIndexIT(boolean localIndex, boolean transactional, boolean columnEncoded) { this.localIndex = localIndex; StringBuilder optionBuilder = new StringBuilder(); if (transactional) { optionBuilder.append("TRANSACTIONAL=true"); } + if (!columnEncoded) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + } this.tableDDLOptions = optionBuilder.toString(); } @@ -87,11 +93,13 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { return getConnection(props); } - @Parameters(name="MutableIndexIT_localIndex={0},transactional={1}") // name is used by failsafe as file name in reports + @Parameters(name="MutableIndexIT_localIndex={0},transactional={1},columnEncoded={2}") // name is used by failsafe as file name in reports public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { - { false, false }, { false, true }, { true, false }, { true, true } - }); + return Arrays.asList(new Boolean[][] { + { false, false, false }, { false, false, true }, + { false, true, false }, { false, true, true }, + { true, false, false }, { true, false, true }, + { true, true, false }, { true, true, true } }); } @Test @@ -613,11 +621,13 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { } @Test + @Ignore //TODO remove after PHOENIX-3585 is fixed public void testSplitDuringIndexScan() throws Exception { testSplitDuringIndexScan(false); } @Test + @Ignore //TODO remove after PHOENIX-3585 is fixed public void testSplitDuringIndexReverseScan() throws Exception { testSplitDuringIndexScan(true); } @@ -675,6 +685,7 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { } @Test + @Ignore //TODO remove after PHOENIX-3585 is fixed public void testIndexHalfStoreFileReader() throws Exception { Connection conn1 = getConnection(); HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java index 29f3758..5ae11bf 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java @@ -58,7 +58,7 @@ public class SaltedTableIT extends BaseClientManagedTimeIT { // 4abc123jkl444 try { // Upsert with no column specifies. - ensureTableCreated(getUrl(), TABLE_WITH_SALTING, TABLE_WITH_SALTING, splits, ts-2); + ensureTableCreated(getUrl(), TABLE_WITH_SALTING, TABLE_WITH_SALTING, splits, ts-2, null); String query = "UPSERT INTO " + TABLE_WITH_SALTING + " VALUES(?,?,?,?,?)"; PreparedStatement stmt = conn.prepareStatement(query); stmt.setInt(1, 1); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java new file mode 100644 index 0000000..badf39b --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.tx; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; +import org.apache.tephra.TxConstants; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Lists; + +@RunWith(Parameterized.class) +public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { + + private final String tableDDLOptions; + + public ParameterizedTransactionIT(boolean mutable, boolean columnEncoded) { + StringBuilder optionBuilder = new StringBuilder("TRANSACTIONAL=true"); + if (!columnEncoded) { + optionBuilder.append(",COLUMN_ENCODED_BYTES=0"); + } + if (!mutable) { + optionBuilder.append(",IMMUTABLE_ROWS=true"); + if (!columnEncoded) { + optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); + } + } + this.tableDDLOptions = optionBuilder.toString(); + } + + @Parameters(name="TransactionIT_mutable={0},columnEncoded={1}") // name is used by failsafe as file name in reports + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + {false, false }, {false, true }, {true, false }, { true, true }, + }); + } + + @Test + public void testReadOwnWrites() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + String selectSql = "SELECT * FROM "+ fullTableName; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.setAutoCommit(false); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertFalse(rs.next()); + + String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + // upsert two rows + TestUtil.setRowKeyColumns(stmt, 1); + stmt.execute(); + TestUtil.setRowKeyColumns(stmt, 2); + stmt.execute(); + + // verify rows can be read even though commit has not been called + rs = conn.createStatement().executeQuery(selectSql); + TestUtil.validateRowKeyColumns(rs, 1); + TestUtil.validateRowKeyColumns(rs, 2); + assertFalse(rs.next()); + + conn.commit(); + + // verify rows can be read after commit + rs = conn.createStatement().executeQuery(selectSql); + TestUtil.validateRowKeyColumns(rs, 1); + TestUtil.validateRowKeyColumns(rs, 2); + assertFalse(rs.next()); + } + } + + @Test + public void testTxnClosedCorrecty() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + String selectSql = "SELECT * FROM "+fullTableName; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.setAutoCommit(false); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertFalse(rs.next()); + + String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + // upsert two rows + TestUtil.setRowKeyColumns(stmt, 1); + stmt.execute(); + TestUtil.setRowKeyColumns(stmt, 2); + stmt.execute(); + + // verify rows can be read even though commit has not been called + rs = conn.createStatement().executeQuery(selectSql); + TestUtil.validateRowKeyColumns(rs, 1); + TestUtil.validateRowKeyColumns(rs, 2); + // Long currentTx = rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp(); + assertFalse(rs.next()); + + conn.close(); + // start new connection + // conn.createStatement().executeQuery(selectSql); + // assertFalse("This transaction should not be on the invalid transactions", + // txManager.getCurrentState().getInvalid().contains(currentTx)); + } + } + + @Test + public void testAutoCommitQuerySingleTable() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.setAutoCommit(true); + // verify no rows returned + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + fullTableName); + assertFalse(rs.next()); + } + } + + @Test + public void testAutoCommitQueryMultiTables() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.setAutoCommit(true); + // verify no rows returned + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + fullTableName + " x JOIN " + fullTableName + " y ON (x.long_pk = y.int_pk)"); + assertFalse(rs.next()); + } + } + + @Test + public void testSelfJoin() throws Exception { + String t1 = generateUniqueName(); + String t2 = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + t1 + " (varchar_pk VARCHAR NOT NULL primary key, a.varchar_col1 VARCHAR, b.varchar_col2 VARCHAR)" + tableDDLOptions); + conn.createStatement().execute("create table " + t2 + " (varchar_pk VARCHAR NOT NULL primary key, a.varchar_col1 VARCHAR, b.varchar_col1 VARCHAR)" + tableDDLOptions); + // verify no rows returned + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + t1 + " x JOIN " + t1 + " y ON (x.varchar_pk = y.a.varchar_col1)"); + assertFalse(rs.next()); + rs = conn.createStatement().executeQuery("SELECT * FROM " + t2 + " x JOIN " + t2 + " y ON (x.varchar_pk = y.a.varchar_col1)"); + assertFalse(rs.next()); + } + } + + private void testRowConflicts(String fullTableName) throws Exception { + try (Connection conn1 = DriverManager.getConnection(getUrl()); + Connection conn2 = DriverManager.getConnection(getUrl())) { + conn1.setAutoCommit(false); + conn2.setAutoCommit(false); + String selectSql = "SELECT * FROM "+fullTableName; + conn1.setAutoCommit(false); + ResultSet rs = conn1.createStatement().executeQuery(selectSql); + boolean immutableRows = conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, fullTableName)).isImmutableRows(); + assertFalse(rs.next()); + // upsert row using conn1 + String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn1.prepareStatement(upsertSql); + TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 10); + stmt.execute(); + // upsert row using conn2 + upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, ?, ?, ?)"; + stmt = conn2.prepareStatement(upsertSql); + TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 11); + stmt.execute(); + + conn1.commit(); + //second commit should fail + try { + conn2.commit(); + if (!immutableRows) fail(); + } + catch (SQLException e) { + if (immutableRows) fail(); + assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); + } + } + } + + @Test + public void testRowConflictDetected() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + testRowConflicts(fullTableName); + } + + @Test + public void testNoConflictDetectionForImmutableRows() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET IMMUTABLE_ROWS=true"); + testRowConflicts(fullTableName); + } + + @Test + public void testNonTxToTxTable() throws Exception { + String nonTxTableName = generateUniqueName(); + + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptions); + conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (1)"); + conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (2, 'a')"); + conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (3, 'b')"); + conn.commit(); + + String index = generateUniqueName(); + conn.createStatement().execute("CREATE INDEX " + index + " ON " + nonTxTableName + "(v)"); + // Reset empty column value to an empty value like it is pre-transactions + HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName)); + List<Put>puts = Lists.newArrayList(new Put(PInteger.INSTANCE.toBytes(1)), new Put(PInteger.INSTANCE.toBytes(2)), new Put(PInteger.INSTANCE.toBytes(3))); + for (Put put : puts) { + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); + } + htable.put(puts); + + conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET TRANSACTIONAL=true"); + + htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName)); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(index)); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + + conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (4, 'c')"); + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM " + nonTxTableName + " WHERE v IS NULL"); + assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, nonTxTableName)).isTransactional()); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertFalse(rs.next()); + conn.commit(); + + conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (5, 'd')"); + rs = conn.createStatement().executeQuery("SELECT k FROM " + nonTxTableName); + assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, index)).isTransactional()); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(2,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(5,rs.getInt(1)); + assertFalse(rs.next()); + conn.rollback(); + + rs = conn.createStatement().executeQuery("SELECT k FROM " + nonTxTableName); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(2,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + assertFalse(rs.next()); + } + + @Ignore + @Test + public void testNonTxToTxTableFailure() throws Exception { + String nonTxTableName = generateUniqueName(); + + Connection conn = DriverManager.getConnection(getUrl()); + // Put table in SYSTEM schema to prevent attempts to update the cache after we disable SYSTEM.CATALOG + conn.createStatement().execute("CREATE TABLE SYSTEM." + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptions); + conn.createStatement().execute("UPSERT INTO SYSTEM." + nonTxTableName + " VALUES (1)"); + conn.commit(); + // Reset empty column value to an empty value like it is pre-transactions + HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); + Put put = new Put(PInteger.INSTANCE.toBytes(1)); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); + htable.put(put); + + HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + try { + // This will succeed initially in updating the HBase metadata, but then will fail when + // the SYSTEM.CATALOG table is attempted to be updated, exercising the code to restore + // the coprocessors back to the non transactional ones. + conn.createStatement().execute("ALTER TABLE SYSTEM." + nonTxTableName + " SET TRANSACTIONAL=true"); + fail(); + } catch (SQLException e) { + assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled")); + } finally { + admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + admin.close(); + } + + ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM SYSTEM." + nonTxTableName + " WHERE v IS NULL"); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertFalse(rs.next()); + + htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); + assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices(). + getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)). + getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions()); + } + + @Test + public void testCreateTableToBeTransactional() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String t1 = generateUniqueName(); + String t2 = generateUniqueName(); + String ddl = "CREATE TABLE " + t1 + " (k varchar primary key) " + tableDDLOptions; + conn.createStatement().execute(ddl); + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + PTable table = pconn.getTable(new PTableKey(null, t1)); + HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); + assertTrue(table.isTransactional()); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + + try { + ddl = "ALTER TABLE " + t1 + " SET transactional=false"; + conn.createStatement().execute(ddl); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); + } + + HBaseAdmin admin = pconn.getQueryServices().getAdmin(); + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(t2)); + desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)); + admin.createTable(desc); + ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true"; + conn.createStatement().execute(ddl); + assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA)); + + // Should be ok, as HBase metadata should match existing metadata. + ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; + try { + conn.createStatement().execute(ddl); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); + } + ddl += " transactional=true"; + conn.createStatement().execute(ddl); + table = pconn.getTable(new PTableKey(null, t1)); + htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); + assertTrue(table.isTransactional()); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + } + + @Test + public void testCurrentDate() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + String selectSql = "SELECT current_date() FROM "+fullTableName; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.setAutoCommit(false); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertFalse(rs.next()); + + String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + // upsert two rows + TestUtil.setRowKeyColumns(stmt, 1); + stmt.execute(); + conn.commit(); + + rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + Date date1 = rs.getDate(1); + assertFalse(rs.next()); + + Thread.sleep(1000); + + rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + Date date2 = rs.getDate(1); + assertFalse(rs.next()); + assertTrue("current_date() should change while executing multiple statements", date2.getTime() > date1.getTime()); + } + } + + + @Test + public void testParallelUpsertSelect() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512)); + props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + String fullTableName1 = generateUniqueName(); + String fullTableName2 = generateUniqueName(); + String sequenceName = "S_" + generateUniqueName(); + conn.createStatement().execute("CREATE SEQUENCE " + sequenceName); + conn.createStatement().execute("CREATE TABLE " + fullTableName1 + " (pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4" + + (!tableDDLOptions.isEmpty()? "," : "") + tableDDLOptions); + conn.createStatement().execute("CREATE TABLE " + fullTableName2 + " (pk INTEGER PRIMARY KEY, val INTEGER)" + tableDDLOptions); + + for (int i = 0; i < 100; i++) { + conn.createStatement().execute("UPSERT INTO " + fullTableName1 + " VALUES (NEXT VALUE FOR " + sequenceName + ", " + (i%10) + ")"); + } + conn.commit(); + conn.setAutoCommit(true); + int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName2 + " SELECT pk, val FROM " + fullTableName1); + assertEquals(100,upsertCount); + conn.close(); + } + + @Test + public void testInflightPartialEval() throws SQLException { + + try (Connection conn = DriverManager.getConnection(getUrl())) { + String transactTableName = generateUniqueName(); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions); + + + try (Connection conn1 = DriverManager.getConnection(getUrl()); Connection conn2 = DriverManager.getConnection(getUrl())) { + conn1.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES ('a','b','x')"); + // Select to force uncommitted data to be written + ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + transactTableName); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("b", rs.getString(2)); + assertFalse(rs.next()); + + conn2.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES ('a','c','x')"); + // Select to force uncommitted data to be written + rs = conn2.createStatement().executeQuery("SELECT * FROM " + transactTableName ); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("c", rs.getString(2)); + assertFalse(rs.next()); + + // If the AndExpression were to see the uncommitted row from conn2, the filter would + // filter the row out early and no longer continue to evaluate other cells due to + // the way partial evaluation holds state. + rs = conn1.createStatement().executeQuery("SELECT * FROM " + transactTableName + " WHERE v1 != 'c' AND v2 = 'x'"); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("b", rs.getString(2)); + assertFalse(rs.next()); + + // Same as above for conn1 data + rs = conn2.createStatement().executeQuery("SELECT * FROM " + transactTableName + " WHERE v1 != 'b' AND v2 = 'x'"); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("c", rs.getString(2)); + assertFalse(rs.next()); + } + + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 1399f6c..f37d09b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -26,337 +26,110 @@ import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DatabaseMetaData; -import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.List; import java.util.Properties; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.types.PInteger; -import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TestUtil; import org.apache.tephra.TxConstants; -import org.junit.Ignore; import org.junit.Test; -import com.google.common.collect.Lists; +public class TransactionIT extends ParallelStatsDisabledIT { -public class TransactionIT extends ParallelStatsDisabledIT { - @Test - public void testReadOwnWrites() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - String selectSql = "SELECT * FROM "+ fullTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows - TestUtil.setRowKeyColumns(stmt, 1); - stmt.execute(); - TestUtil.setRowKeyColumns(stmt, 2); - stmt.execute(); - - // verify rows can be read even though commit has not been called - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - assertFalse(rs.next()); - - conn.commit(); - - // verify rows can be read after commit - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - assertFalse(rs.next()); - } + public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException { + String tableName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + stmt.execute("DROP TABLE " + tableName); + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true"); + stmt.execute("CREATE INDEX " + tableName + "_IDX ON " + tableName + " (v1) INCLUDE(v2)"); + assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName)).isTransactional()); + assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName + "_IDX")).isTransactional()); } @Test - public void testTxnClosedCorrecty() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - String selectSql = "SELECT * FROM "+fullTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); + public void testRowTimestampDisabled() throws SQLException { + String tableName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows - TestUtil.setRowKeyColumns(stmt, 1); - stmt.execute(); - TestUtil.setRowKeyColumns(stmt, 2); - stmt.execute(); - - // verify rows can be read even though commit has not been called - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - // Long currentTx = rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp(); - assertFalse(rs.next()); - - conn.close(); - // start new connection - // conn.createStatement().executeQuery(selectSql); - // assertFalse("This transaction should not be on the invalid transactions", - // txManager.getCurrentState().getInvalid().contains(currentTx)); - } - } - - @Test - public void testAutoCommitQuerySingleTable() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(true); - // verify no rows returned - ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + fullTableName); - assertFalse(rs.next()); - } - } - - @Test - public void testAutoCommitQueryMultiTables() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(true); - // verify no rows returned - ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + fullTableName + " a JOIN " + fullTableName + " b ON (a.long_pk = b.int_pk)"); - assertFalse(rs.next()); - } - } - - @Test - public void testColConflicts() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - try (Connection conn1 = DriverManager.getConnection(getUrl()); - Connection conn2 = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn1, fullTableName); - conn1.setAutoCommit(false); - conn2.setAutoCommit(false); - String selectSql = "SELECT * FROM "+fullTableName; - conn1.setAutoCommit(false); - ResultSet rs = conn1.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - // upsert row using conn1 - String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn1.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 10); - stmt.execute(); - // upsert row using conn2 - stmt = conn2.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 11); - stmt.execute(); - - conn1.commit(); - //second commit should fail + Statement stmt = conn.createStatement(); try { - conn2.commit(); + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true"); fail(); - } - catch (SQLException e) { - assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); } - } - } - - private void testRowConflicts(String fullTableName) throws Exception { - try (Connection conn1 = DriverManager.getConnection(getUrl()); - Connection conn2 = DriverManager.getConnection(getUrl())) { - conn1.setAutoCommit(false); - conn2.setAutoCommit(false); - String selectSql = "SELECT * FROM "+fullTableName; - conn1.setAutoCommit(false); - ResultSet rs = conn1.createStatement().executeQuery(selectSql); - boolean immutableRows = conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, fullTableName)).isImmutableRows(); - assertFalse(rs.next()); - // upsert row using conn1 - String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn1.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 10); - stmt.execute(); - // upsert row using conn2 - upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, ?, ?, ?)"; - stmt = conn2.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 11); - stmt.execute(); - - conn1.commit(); - //second commit should fail + catch(SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); + } + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP))"); try { - conn2.commit(); - if (!immutableRows) fail(); - } - catch (SQLException e) { - if (immutableRows) fail(); - assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); + stmt.execute("ALTER TABLE " + tableName + " SET TRANSACTIONAL=true"); + fail(); + } + catch(SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); } } } @Test - public void testRowConflictDetected() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - Connection conn = DriverManager.getConnection(getUrl()); - TestUtil.createTransactionalTable(conn, fullTableName); - testRowConflicts(fullTableName); - } - - @Test - public void testNoConflictDetectionForImmutableRows() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - Connection conn = DriverManager.getConnection(getUrl()); - TestUtil.createTransactionalTable(conn, fullTableName); - conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET IMMUTABLE_ROWS=true"); - testRowConflicts(fullTableName); - } - - @Test - public void testNonTxToTxTable() throws Exception { - String nonTxTableName = generateUniqueName(); - String indexName = generateUniqueName() + "_IDX"; + public void testTransactionalTableMetadata() throws SQLException { - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)"); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (1)"); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (2, 'a')"); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (3, 'b')"); - conn.commit(); - - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + nonTxTableName + "(v)"); - // Reset empty column value to an empty value like it is pre-transactions - HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName)); - List<Put>puts = Lists.newArrayList(new Put(PInteger.INSTANCE.toBytes(1)), new Put(PInteger.INSTANCE.toBytes(2)), new Put(PInteger.INSTANCE.toBytes(3))); - for (Put put : puts) { - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); - } - htable.put(puts); - - conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET TRANSACTIONAL=true"); - - htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName)); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String transactTableName = generateUniqueName(); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + + "TRANSACTIONAL=true"); + conn.commit(); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (4, 'c')"); - ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM " + nonTxTableName + " WHERE v IS NULL"); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, nonTxTableName)).isTransactional()); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertFalse(rs.next()); - conn.commit(); - - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (5, 'd')"); - rs = conn.createStatement().executeQuery("SELECT k FROM " + nonTxTableName); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, indexName)).isTransactional()); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(2,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(3,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(4,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(5,rs.getInt(1)); - assertFalse(rs.next()); - conn.rollback(); - - rs = conn.createStatement().executeQuery("SELECT k FROM " + nonTxTableName); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(2,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(3,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(4,rs.getInt(1)); - assertFalse(rs.next()); + DatabaseMetaData dbmd = conn.getMetaData(); + ResultSet rs = dbmd.getTables(null, null, StringUtil.escapeLike(transactTableName), null); + assertTrue(rs.next()); + assertEquals("Transactional table was not marked as transactional in JDBC API.", + "true", rs.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); + + String nonTransactTableName = generateUniqueName(); + Statement stmt2 = conn.createStatement(); + stmt2.execute("CREATE TABLE " + nonTransactTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "); + conn.commit(); + + ResultSet rs2 = dbmd.getTables(null, null, StringUtil.escapeLike(nonTransactTableName), null); + assertTrue(rs2.next()); + assertEquals("Non-transactional table was marked as transactional in JDBC API.", + "false", rs2.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); + } } - @Ignore @Test - public void testNonTxToTxTableFailure() throws Exception { - String nonTxTableName = generateUniqueName(); - - Connection conn = DriverManager.getConnection(getUrl()); - // Put table in SYSTEM schema to prevent attempts to update the cache after we disable SYSTEM.CATALOG - conn.createStatement().execute("CREATE TABLE SYSTEM." + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)"); - conn.createStatement().execute("UPSERT INTO SYSTEM." + nonTxTableName + " VALUES (1)"); - conn.commit(); - // Reset empty column value to an empty value like it is pre-transactions - HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); - Put put = new Put(PInteger.INSTANCE.toBytes(1)); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); - htable.put(put); - - HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); - try { - // This will succeed initially in updating the HBase metadata, but then will fail when - // the SYSTEM.CATALOG table is attempted to be updated, exercising the code to restore - // the coprocessors back to the non transactional ones. - conn.createStatement().execute("ALTER TABLE SYSTEM." + nonTxTableName + " SET TRANSACTIONAL=true"); + public void testOnDupKeyForTransactionalTable() throws Exception { + // TODO: we should support having a transactional table defined for a connectionless connection + try (Connection conn = DriverManager.getConnection(getUrl())) { + String transactTableName = generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + transactTableName + " (k integer not null primary key, v bigint) TRANSACTIONAL=true"); + conn.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1"); fail(); } catch (SQLException e) { - assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled")); - } finally { - admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); - admin.close(); + assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL.getErrorCode(), e.getErrorCode()); } - - ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM SYSTEM." + nonTxTableName + " WHERE v IS NULL"); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertFalse(rs.next()); - - htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); - assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices(). - getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)). - getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions()); } @Test @@ -439,120 +212,38 @@ public class TransactionIT extends ParallelStatsDisabledIT { } @Test - public void testCreateTableToBeTransactional() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - String t1 = generateUniqueName(); - String t2 = generateUniqueName(); - String ddl = "CREATE TABLE " + t1 + " (k varchar primary key) transactional=true"; - conn.createStatement().execute(ddl); - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - PTable table = pconn.getTable(new PTableKey(null, t1)); - HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); - assertTrue(table.isTransactional()); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - - try { - ddl = "ALTER TABLE " + t1 + " SET transactional=false"; - conn.createStatement().execute(ddl); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); - } - - HBaseAdmin admin = pconn.getQueryServices().getAdmin(); - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(t2)); - desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)); - admin.createTable(desc); - ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true"; - conn.createStatement().execute(ddl); - assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA)); - - // Should be ok, as HBase metadata should match existing metadata. - ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; - try { - conn.createStatement().execute(ddl); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); - } - ddl += " transactional=true"; - conn.createStatement().execute(ddl); - table = pconn.getTable(new PTableKey(null, t1)); - htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); - assertTrue(table.isTransactional()); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - } - - @Test - public void testCurrentDate() throws Exception { + public void testColConflicts() throws Exception { String transTableName = generateUniqueName(); String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - String selectSql = "SELECT current_date() FROM "+fullTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); + try (Connection conn1 = DriverManager.getConnection(getUrl()); + Connection conn2 = DriverManager.getConnection(getUrl())) { + TestUtil.createTransactionalTable(conn1, fullTableName); + conn1.setAutoCommit(false); + conn2.setAutoCommit(false); + String selectSql = "SELECT * FROM "+fullTableName; + conn1.setAutoCommit(false); + ResultSet rs = conn1.createStatement().executeQuery(selectSql); assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows + // upsert row using conn1 + String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn1.prepareStatement(upsertSql); TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 10); + stmt.execute(); + // upsert row using conn2 + stmt = conn2.prepareStatement(upsertSql); + TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 11); stmt.execute(); - conn.commit(); - - rs = conn.createStatement().executeQuery(selectSql); - assertTrue(rs.next()); - Date date1 = rs.getDate(1); - assertFalse(rs.next()); - - Thread.sleep(1000); - rs = conn.createStatement().executeQuery(selectSql); - assertTrue(rs.next()); - Date date2 = rs.getDate(1); - assertFalse(rs.next()); - assertTrue("current_date() should change while executing multiple statements", date2.getTime() > date1.getTime()); - } - } - - @Test - public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException { - String tableName = generateUniqueName(); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - stmt.execute("DROP TABLE " + tableName); - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true"); - stmt.execute("CREATE INDEX " + tableName + "_IDX ON " + tableName + " (v1) INCLUDE(v2)"); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName)).isTransactional()); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName + "_IDX")).isTransactional()); - } - - @Test - public void testRowTimestampDisabled() throws SQLException { - String tableName = generateUniqueName(); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.setAutoCommit(false); - Statement stmt = conn.createStatement(); - try { - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true"); - fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); - } - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP))"); + conn1.commit(); + //second commit should fail try { - stmt.execute("ALTER TABLE " + tableName + " SET TRANSACTIONAL=true"); + conn2.commit(); fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); + } + catch (SQLException e) { + assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); } } } @@ -600,118 +291,4 @@ public class TransactionIT extends ParallelStatsDisabledIT { conn.close(); } } - - @Test - public void testParallelUpsertSelect() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512)); - props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); - props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - String fullTableName1 = generateUniqueName(); - String fullTableName2 = generateUniqueName(); - String sequenceName = "S_" + generateUniqueName(); - conn.createStatement().execute("CREATE SEQUENCE " + sequenceName); - conn.createStatement().execute("CREATE TABLE " + fullTableName1 + " (pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4,TRANSACTIONAL=true"); - conn.createStatement().execute("CREATE TABLE " + fullTableName2 + " (pk INTEGER PRIMARY KEY, val INTEGER) TRANSACTIONAL=true"); - - for (int i = 0; i < 100; i++) { - conn.createStatement().execute("UPSERT INTO " + fullTableName1 + " VALUES (NEXT VALUE FOR " + sequenceName + ", " + (i%10) + ")"); - } - conn.commit(); - conn.setAutoCommit(true); - int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName2 + " SELECT pk, val FROM " + fullTableName1); - assertEquals(100,upsertCount); - conn.close(); - } - - @Test - public void testTransactionalTableMetadata() throws SQLException { - - try (Connection conn = DriverManager.getConnection(getUrl())) { - String transactTableName = generateUniqueName(); - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + - "TRANSACTIONAL=true"); - conn.commit(); - - DatabaseMetaData dbmd = conn.getMetaData(); - ResultSet rs = dbmd.getTables(null, null, StringUtil.escapeLike(transactTableName), null); - assertTrue(rs.next()); - assertEquals("Transactional table was not marked as transactional in JDBC API.", - "true", rs.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); - - String nonTransactTableName = generateUniqueName(); - Statement stmt2 = conn.createStatement(); - stmt2.execute("CREATE TABLE " + nonTransactTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "); - conn.commit(); - - ResultSet rs2 = dbmd.getTables(null, null, StringUtil.escapeLike(nonTransactTableName), null); - assertTrue(rs2.next()); - assertEquals("Non-transactional table was marked as transactional in JDBC API.", - "false", rs2.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); - } - } - - @Test - public void testInflightPartialEval() throws SQLException { - - try (Connection conn = DriverManager.getConnection(getUrl())) { - String transactTableName = generateUniqueName(); - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + - "TRANSACTIONAL=true"); - - try (Connection conn1 = DriverManager.getConnection(getUrl()); Connection conn2 = DriverManager.getConnection(getUrl())) { - conn1.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES ('a','b','x')"); - // Select to force uncommitted data to be written - ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + transactTableName); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("b", rs.getString(2)); - assertFalse(rs.next()); - - conn2.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES ('a','c','x')"); - // Select to force uncommitted data to be written - rs = conn2.createStatement().executeQuery("SELECT * FROM " + transactTableName ); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("c", rs.getString(2)); - assertFalse(rs.next()); - - // If the AndExpression were to see the uncommitted row from conn2, the filter would - // filter the row out early and no longer continue to evaluate other cells due to - // the way partial evaluation holds state. - rs = conn1.createStatement().executeQuery("SELECT * FROM " + transactTableName + " WHERE v1 != 'c' AND v2 = 'x'"); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("b", rs.getString(2)); - assertFalse(rs.next()); - - // Same as above for conn1 data - rs = conn2.createStatement().executeQuery("SELECT * FROM " + transactTableName + " WHERE v1 != 'b' AND v2 = 'x'"); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("c", rs.getString(2)); - assertFalse(rs.next()); - } - - } - } - - - @Test - public void testOnDupKeyForTransactionalTable() throws Exception { - // TODO: we should support having a transactional table defined for a connectionless connection - try (Connection conn = DriverManager.getConnection(getUrl())) { - String transactTableName = generateUniqueName(); - conn.createStatement().execute("CREATE TABLE " + transactTableName + " (k integer not null primary key, v bigint) TRANSACTIONAL=true"); - conn.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1"); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL.getErrorCode(), e.getErrorCode()); - } - } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java index 246ecd4..cb3b4b3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java @@ -36,6 +36,7 @@ import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.tephra.Transaction.VisibilityLevel; @@ -48,12 +49,25 @@ import org.junit.runners.Parameterized.Parameters; public class TxCheckpointIT extends ParallelStatsDisabledIT { private final boolean localIndex; - private final boolean mutable; + private final String tableDDLOptions; - public TxCheckpointIT(boolean localIndex, boolean mutable) { + public TxCheckpointIT(boolean localIndex, boolean mutable, boolean columnEncoded) { + StringBuilder optionBuilder = new StringBuilder(); this.localIndex = localIndex; - this.mutable = mutable; - + if (!columnEncoded) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + } + if (!mutable) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("IMMUTABLE_ROWS=true"); + if (!columnEncoded) { + optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); + } + } + this.tableDDLOptions = optionBuilder.toString(); } private static Connection getConnection() throws SQLException { @@ -66,10 +80,11 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { return conn; } - @Parameters(name="TxCheckpointIT_localIndex={0},mutable={1}") // name is used by failsafe as file name in reports + @Parameters(name="TxCheckpointIT_localIndex={0},mutable={1},columnEncoded={2}") // name is used by failsafe as file name in reports public static Collection<Boolean[]> data() { return Arrays.asList(new Boolean[][] { - { false, false }, { false, true }, { true, false }, { true, true } + { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true }, + { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true } }); } @@ -86,7 +101,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { Connection conn = getConnection(props); conn.setAutoCommit(true); conn.createStatement().execute("CREATE SEQUENCE "+seqName); - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)" + tableDDLOptions); conn.createStatement().execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + "(val)"); conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES (NEXT VALUE FOR " + seqName + ",1)"); @@ -117,12 +132,11 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { } private void testRollbackOfUncommittedDelete(String indexDDL, String fullTableName) throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = getConnection(); conn.setAutoCommit(false); try { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); stmt.execute(indexDDL); stmt.executeUpdate("upsert into " + fullTableName + " values('x1', 'y1', 'a1')"); @@ -206,13 +220,11 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { String tableName = "TBL_" + generateUniqueName(); String indexName = "IDX_" + generateUniqueName(); String fullTableName = SchemaUtil.getTableName(tableName, tableName); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = getConnection()) { conn.setAutoCommit(false); Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" - + (!mutable ? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)"); @@ -301,10 +313,8 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { try (Connection conn = getConnection()) { conn.setAutoCommit(false); Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" - + (!mutable ? " IMMUTABLE_ROWS=true" : "")); - stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" - + (!mutable ? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" + tableDDLOptions); + stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" + tableDDLOptions); stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + "1 (FK1B)"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 0383251..18e4034 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -56,6 +56,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCac import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService; +import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; @@ -215,6 +216,7 @@ public class ServerCacheClient { } builder.setCacheId(ByteStringer.wrap(cacheId)); builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr)); + builder.setHasProtoBufIndexMaintainer(true); ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder(); svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName()); builder.setCacheFactory(svrCacheFactoryBuider.build()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java index 5c33967..d30f5dd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java @@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager; public interface TenantCache { MemoryManager getMemoryManager(); Closeable getServerCache(ImmutableBytesPtr cacheId); - Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException; + Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer) throws SQLException; void removeServerCache(ImmutableBytesPtr cacheId); void removeAllServerCache(); }