http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 1399f6c..f37d09b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -26,337 +26,110 @@ import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DatabaseMetaData; -import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.List; import java.util.Properties; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.types.PInteger; -import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TestUtil; import org.apache.tephra.TxConstants; -import org.junit.Ignore; import org.junit.Test; -import com.google.common.collect.Lists; +public class TransactionIT extends ParallelStatsDisabledIT { -public class TransactionIT extends ParallelStatsDisabledIT { - @Test - public void testReadOwnWrites() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - String selectSql = "SELECT * FROM "+ fullTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows - TestUtil.setRowKeyColumns(stmt, 1); - stmt.execute(); - TestUtil.setRowKeyColumns(stmt, 2); - stmt.execute(); - - // verify rows can be read even though commit has not been called - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - assertFalse(rs.next()); - - conn.commit(); - - // verify rows can be read after commit - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - assertFalse(rs.next()); - } + public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException { + String tableName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + stmt.execute("DROP TABLE " + tableName); + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true"); + stmt.execute("CREATE INDEX " + tableName + "_IDX ON " + tableName + " (v1) INCLUDE(v2)"); + assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName)).isTransactional()); + assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName + "_IDX")).isTransactional()); } @Test - public void testTxnClosedCorrecty() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - String selectSql = "SELECT * FROM "+fullTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); + public void testRowTimestampDisabled() throws SQLException { + String tableName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows - TestUtil.setRowKeyColumns(stmt, 1); - stmt.execute(); - TestUtil.setRowKeyColumns(stmt, 2); - stmt.execute(); - - // verify rows can be read even though commit has not been called - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - // Long currentTx = rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp(); - assertFalse(rs.next()); - - conn.close(); - // start new connection - // conn.createStatement().executeQuery(selectSql); - // assertFalse("This transaction should not be on the invalid transactions", - // txManager.getCurrentState().getInvalid().contains(currentTx)); - } - } - - @Test - public void testAutoCommitQuerySingleTable() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(true); - // verify no rows returned - ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + fullTableName); - assertFalse(rs.next()); - } - } - - @Test - public void testAutoCommitQueryMultiTables() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(true); - // verify no rows returned - ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + fullTableName + " a JOIN " + fullTableName + " b ON (a.long_pk = b.int_pk)"); - assertFalse(rs.next()); - } - } - - @Test - public void testColConflicts() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - try (Connection conn1 = DriverManager.getConnection(getUrl()); - Connection conn2 = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn1, fullTableName); - conn1.setAutoCommit(false); - conn2.setAutoCommit(false); - String selectSql = "SELECT * FROM "+fullTableName; - conn1.setAutoCommit(false); - ResultSet rs = conn1.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - // upsert row using conn1 - String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn1.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 10); - stmt.execute(); - // upsert row using conn2 - stmt = conn2.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 11); - stmt.execute(); - - conn1.commit(); - //second commit should fail + Statement stmt = conn.createStatement(); try { - conn2.commit(); + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true"); fail(); - } - catch (SQLException e) { - assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); } - } - } - - private void testRowConflicts(String fullTableName) throws Exception { - try (Connection conn1 = DriverManager.getConnection(getUrl()); - Connection conn2 = DriverManager.getConnection(getUrl())) { - conn1.setAutoCommit(false); - conn2.setAutoCommit(false); - String selectSql = "SELECT * FROM "+fullTableName; - conn1.setAutoCommit(false); - ResultSet rs = conn1.createStatement().executeQuery(selectSql); - boolean immutableRows = conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, fullTableName)).isImmutableRows(); - assertFalse(rs.next()); - // upsert row using conn1 - String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn1.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 10); - stmt.execute(); - // upsert row using conn2 - upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, ?, ?, ?)"; - stmt = conn2.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 11); - stmt.execute(); - - conn1.commit(); - //second commit should fail + catch(SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); + } + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP))"); try { - conn2.commit(); - if (!immutableRows) fail(); - } - catch (SQLException e) { - if (immutableRows) fail(); - assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); + stmt.execute("ALTER TABLE " + tableName + " SET TRANSACTIONAL=true"); + fail(); + } + catch(SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); } } } @Test - public void testRowConflictDetected() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - Connection conn = DriverManager.getConnection(getUrl()); - TestUtil.createTransactionalTable(conn, fullTableName); - testRowConflicts(fullTableName); - } - - @Test - public void testNoConflictDetectionForImmutableRows() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - Connection conn = DriverManager.getConnection(getUrl()); - TestUtil.createTransactionalTable(conn, fullTableName); - conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET IMMUTABLE_ROWS=true"); - testRowConflicts(fullTableName); - } - - @Test - public void testNonTxToTxTable() throws Exception { - String nonTxTableName = generateUniqueName(); - String indexName = generateUniqueName() + "_IDX"; + public void testTransactionalTableMetadata() throws SQLException { - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)"); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (1)"); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (2, 'a')"); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (3, 'b')"); - conn.commit(); - - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + nonTxTableName + "(v)"); - // Reset empty column value to an empty value like it is pre-transactions - HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName)); - List<Put>puts = Lists.newArrayList(new Put(PInteger.INSTANCE.toBytes(1)), new Put(PInteger.INSTANCE.toBytes(2)), new Put(PInteger.INSTANCE.toBytes(3))); - for (Put put : puts) { - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); - } - htable.put(puts); - - conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET TRANSACTIONAL=true"); - - htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName)); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String transactTableName = generateUniqueName(); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + + "TRANSACTIONAL=true"); + conn.commit(); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (4, 'c')"); - ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM " + nonTxTableName + " WHERE v IS NULL"); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, nonTxTableName)).isTransactional()); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertFalse(rs.next()); - conn.commit(); - - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (5, 'd')"); - rs = conn.createStatement().executeQuery("SELECT k FROM " + nonTxTableName); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, indexName)).isTransactional()); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(2,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(3,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(4,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(5,rs.getInt(1)); - assertFalse(rs.next()); - conn.rollback(); - - rs = conn.createStatement().executeQuery("SELECT k FROM " + nonTxTableName); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(2,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(3,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(4,rs.getInt(1)); - assertFalse(rs.next()); + DatabaseMetaData dbmd = conn.getMetaData(); + ResultSet rs = dbmd.getTables(null, null, StringUtil.escapeLike(transactTableName), null); + assertTrue(rs.next()); + assertEquals("Transactional table was not marked as transactional in JDBC API.", + "true", rs.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); + + String nonTransactTableName = generateUniqueName(); + Statement stmt2 = conn.createStatement(); + stmt2.execute("CREATE TABLE " + nonTransactTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "); + conn.commit(); + + ResultSet rs2 = dbmd.getTables(null, null, StringUtil.escapeLike(nonTransactTableName), null); + assertTrue(rs2.next()); + assertEquals("Non-transactional table was marked as transactional in JDBC API.", + "false", rs2.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); + } } - @Ignore @Test - public void testNonTxToTxTableFailure() throws Exception { - String nonTxTableName = generateUniqueName(); - - Connection conn = DriverManager.getConnection(getUrl()); - // Put table in SYSTEM schema to prevent attempts to update the cache after we disable SYSTEM.CATALOG - conn.createStatement().execute("CREATE TABLE SYSTEM." + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)"); - conn.createStatement().execute("UPSERT INTO SYSTEM." + nonTxTableName + " VALUES (1)"); - conn.commit(); - // Reset empty column value to an empty value like it is pre-transactions - HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); - Put put = new Put(PInteger.INSTANCE.toBytes(1)); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); - htable.put(put); - - HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); - try { - // This will succeed initially in updating the HBase metadata, but then will fail when - // the SYSTEM.CATALOG table is attempted to be updated, exercising the code to restore - // the coprocessors back to the non transactional ones. - conn.createStatement().execute("ALTER TABLE SYSTEM." + nonTxTableName + " SET TRANSACTIONAL=true"); + public void testOnDupKeyForTransactionalTable() throws Exception { + // TODO: we should support having a transactional table defined for a connectionless connection + try (Connection conn = DriverManager.getConnection(getUrl())) { + String transactTableName = generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + transactTableName + " (k integer not null primary key, v bigint) TRANSACTIONAL=true"); + conn.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1"); fail(); } catch (SQLException e) { - assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled")); - } finally { - admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); - admin.close(); + assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL.getErrorCode(), e.getErrorCode()); } - - ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM SYSTEM." + nonTxTableName + " WHERE v IS NULL"); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertFalse(rs.next()); - - htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); - assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices(). - getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)). - getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions()); } @Test @@ -439,120 +212,38 @@ public class TransactionIT extends ParallelStatsDisabledIT { } @Test - public void testCreateTableToBeTransactional() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - String t1 = generateUniqueName(); - String t2 = generateUniqueName(); - String ddl = "CREATE TABLE " + t1 + " (k varchar primary key) transactional=true"; - conn.createStatement().execute(ddl); - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - PTable table = pconn.getTable(new PTableKey(null, t1)); - HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); - assertTrue(table.isTransactional()); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - - try { - ddl = "ALTER TABLE " + t1 + " SET transactional=false"; - conn.createStatement().execute(ddl); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); - } - - HBaseAdmin admin = pconn.getQueryServices().getAdmin(); - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(t2)); - desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)); - admin.createTable(desc); - ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true"; - conn.createStatement().execute(ddl); - assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA)); - - // Should be ok, as HBase metadata should match existing metadata. - ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; - try { - conn.createStatement().execute(ddl); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); - } - ddl += " transactional=true"; - conn.createStatement().execute(ddl); - table = pconn.getTable(new PTableKey(null, t1)); - htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); - assertTrue(table.isTransactional()); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - } - - @Test - public void testCurrentDate() throws Exception { + public void testColConflicts() throws Exception { String transTableName = generateUniqueName(); String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - String selectSql = "SELECT current_date() FROM "+fullTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); + try (Connection conn1 = DriverManager.getConnection(getUrl()); + Connection conn2 = DriverManager.getConnection(getUrl())) { + TestUtil.createTransactionalTable(conn1, fullTableName); + conn1.setAutoCommit(false); + conn2.setAutoCommit(false); + String selectSql = "SELECT * FROM "+fullTableName; + conn1.setAutoCommit(false); + ResultSet rs = conn1.createStatement().executeQuery(selectSql); assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows + // upsert row using conn1 + String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn1.prepareStatement(upsertSql); TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 10); + stmt.execute(); + // upsert row using conn2 + stmt = conn2.prepareStatement(upsertSql); + TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 11); stmt.execute(); - conn.commit(); - - rs = conn.createStatement().executeQuery(selectSql); - assertTrue(rs.next()); - Date date1 = rs.getDate(1); - assertFalse(rs.next()); - - Thread.sleep(1000); - rs = conn.createStatement().executeQuery(selectSql); - assertTrue(rs.next()); - Date date2 = rs.getDate(1); - assertFalse(rs.next()); - assertTrue("current_date() should change while executing multiple statements", date2.getTime() > date1.getTime()); - } - } - - @Test - public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException { - String tableName = generateUniqueName(); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - stmt.execute("DROP TABLE " + tableName); - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true"); - stmt.execute("CREATE INDEX " + tableName + "_IDX ON " + tableName + " (v1) INCLUDE(v2)"); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName)).isTransactional()); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName + "_IDX")).isTransactional()); - } - - @Test - public void testRowTimestampDisabled() throws SQLException { - String tableName = generateUniqueName(); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.setAutoCommit(false); - Statement stmt = conn.createStatement(); - try { - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true"); - fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); - } - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP))"); + conn1.commit(); + //second commit should fail try { - stmt.execute("ALTER TABLE " + tableName + " SET TRANSACTIONAL=true"); + conn2.commit(); fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); + } + catch (SQLException e) { + assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); } } } @@ -600,118 +291,4 @@ public class TransactionIT extends ParallelStatsDisabledIT { conn.close(); } } - - @Test - public void testParallelUpsertSelect() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512)); - props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); - props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - String fullTableName1 = generateUniqueName(); - String fullTableName2 = generateUniqueName(); - String sequenceName = "S_" + generateUniqueName(); - conn.createStatement().execute("CREATE SEQUENCE " + sequenceName); - conn.createStatement().execute("CREATE TABLE " + fullTableName1 + " (pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4,TRANSACTIONAL=true"); - conn.createStatement().execute("CREATE TABLE " + fullTableName2 + " (pk INTEGER PRIMARY KEY, val INTEGER) TRANSACTIONAL=true"); - - for (int i = 0; i < 100; i++) { - conn.createStatement().execute("UPSERT INTO " + fullTableName1 + " VALUES (NEXT VALUE FOR " + sequenceName + ", " + (i%10) + ")"); - } - conn.commit(); - conn.setAutoCommit(true); - int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName2 + " SELECT pk, val FROM " + fullTableName1); - assertEquals(100,upsertCount); - conn.close(); - } - - @Test - public void testTransactionalTableMetadata() throws SQLException { - - try (Connection conn = DriverManager.getConnection(getUrl())) { - String transactTableName = generateUniqueName(); - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + - "TRANSACTIONAL=true"); - conn.commit(); - - DatabaseMetaData dbmd = conn.getMetaData(); - ResultSet rs = dbmd.getTables(null, null, StringUtil.escapeLike(transactTableName), null); - assertTrue(rs.next()); - assertEquals("Transactional table was not marked as transactional in JDBC API.", - "true", rs.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); - - String nonTransactTableName = generateUniqueName(); - Statement stmt2 = conn.createStatement(); - stmt2.execute("CREATE TABLE " + nonTransactTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "); - conn.commit(); - - ResultSet rs2 = dbmd.getTables(null, null, StringUtil.escapeLike(nonTransactTableName), null); - assertTrue(rs2.next()); - assertEquals("Non-transactional table was marked as transactional in JDBC API.", - "false", rs2.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); - } - } - - @Test - public void testInflightPartialEval() throws SQLException { - - try (Connection conn = DriverManager.getConnection(getUrl())) { - String transactTableName = generateUniqueName(); - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + - "TRANSACTIONAL=true"); - - try (Connection conn1 = DriverManager.getConnection(getUrl()); Connection conn2 = DriverManager.getConnection(getUrl())) { - conn1.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES ('a','b','x')"); - // Select to force uncommitted data to be written - ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + transactTableName); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("b", rs.getString(2)); - assertFalse(rs.next()); - - conn2.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES ('a','c','x')"); - // Select to force uncommitted data to be written - rs = conn2.createStatement().executeQuery("SELECT * FROM " + transactTableName ); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("c", rs.getString(2)); - assertFalse(rs.next()); - - // If the AndExpression were to see the uncommitted row from conn2, the filter would - // filter the row out early and no longer continue to evaluate other cells due to - // the way partial evaluation holds state. - rs = conn1.createStatement().executeQuery("SELECT * FROM " + transactTableName + " WHERE v1 != 'c' AND v2 = 'x'"); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("b", rs.getString(2)); - assertFalse(rs.next()); - - // Same as above for conn1 data - rs = conn2.createStatement().executeQuery("SELECT * FROM " + transactTableName + " WHERE v1 != 'b' AND v2 = 'x'"); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("c", rs.getString(2)); - assertFalse(rs.next()); - } - - } - } - - - @Test - public void testOnDupKeyForTransactionalTable() throws Exception { - // TODO: we should support having a transactional table defined for a connectionless connection - try (Connection conn = DriverManager.getConnection(getUrl())) { - String transactTableName = generateUniqueName(); - conn.createStatement().execute("CREATE TABLE " + transactTableName + " (k integer not null primary key, v bigint) TRANSACTIONAL=true"); - conn.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1"); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL.getErrorCode(), e.getErrorCode()); - } - } - -} +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java index 246ecd4..cb3b4b3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java @@ -36,6 +36,7 @@ import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.tephra.Transaction.VisibilityLevel; @@ -48,12 +49,25 @@ import org.junit.runners.Parameterized.Parameters; public class TxCheckpointIT extends ParallelStatsDisabledIT { private final boolean localIndex; - private final boolean mutable; + private final String tableDDLOptions; - public TxCheckpointIT(boolean localIndex, boolean mutable) { + public TxCheckpointIT(boolean localIndex, boolean mutable, boolean columnEncoded) { + StringBuilder optionBuilder = new StringBuilder(); this.localIndex = localIndex; - this.mutable = mutable; - + if (!columnEncoded) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + } + if (!mutable) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("IMMUTABLE_ROWS=true"); + if (!columnEncoded) { + optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); + } + } + this.tableDDLOptions = optionBuilder.toString(); } private static Connection getConnection() throws SQLException { @@ -66,10 +80,11 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { return conn; } - @Parameters(name="TxCheckpointIT_localIndex={0},mutable={1}") // name is used by failsafe as file name in reports + @Parameters(name="TxCheckpointIT_localIndex={0},mutable={1},columnEncoded={2}") // name is used by failsafe as file name in reports public static Collection<Boolean[]> data() { return Arrays.asList(new Boolean[][] { - { false, false }, { false, true }, { true, false }, { true, true } + { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true }, + { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true } }); } @@ -86,7 +101,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { Connection conn = getConnection(props); conn.setAutoCommit(true); conn.createStatement().execute("CREATE SEQUENCE "+seqName); - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)" + tableDDLOptions); conn.createStatement().execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + "(val)"); conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES (NEXT VALUE FOR " + seqName + ",1)"); @@ -117,12 +132,11 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { } private void testRollbackOfUncommittedDelete(String indexDDL, String fullTableName) throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = getConnection(); conn.setAutoCommit(false); try { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); stmt.execute(indexDDL); stmt.executeUpdate("upsert into " + fullTableName + " values('x1', 'y1', 'a1')"); @@ -206,13 +220,11 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { String tableName = "TBL_" + generateUniqueName(); String indexName = "IDX_" + generateUniqueName(); String fullTableName = SchemaUtil.getTableName(tableName, tableName); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = getConnection()) { conn.setAutoCommit(false); Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" - + (!mutable ? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)"); @@ -301,10 +313,8 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { try (Connection conn = getConnection()) { conn.setAutoCommit(false); Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" - + (!mutable ? " IMMUTABLE_ROWS=true" : "")); - stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" - + (!mutable ? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" + tableDDLOptions); + stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" + tableDDLOptions); stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + "1 (FK1B)"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 0383251..18e4034 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -56,6 +56,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCac import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService; +import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; @@ -215,6 +216,7 @@ public class ServerCacheClient { } builder.setCacheId(ByteStringer.wrap(cacheId)); builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr)); + builder.setHasProtoBufIndexMaintainer(true); ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder(); svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName()); builder.setCacheFactory(svrCacheFactoryBuider.build()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java index 5c33967..d30f5dd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java @@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager; public interface TenantCache { MemoryManager getMemoryManager(); Closeable getServerCache(ImmutableBytesPtr cacheId); - Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException; + Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer) throws SQLException; void removeServerCache(ImmutableBytesPtr cacheId); void removeAllServerCache(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java index 658b4cc..3d178f6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java @@ -104,11 +104,11 @@ public class TenantCacheImpl implements TenantCache { } @Override - public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException { + public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer) throws SQLException { MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length); boolean success = false; try { - Closeable element = cacheFactory.newCache(cachePtr, txState, chunk); + Closeable element = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer); getServerCaches().put(cacheId, element); success = true; return element; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java index 07df105..b482998 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java @@ -38,6 +38,7 @@ import org.apache.phoenix.expression.IsNullExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.visitor.StatelessTraverseNoExpressionVisitor; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; @@ -239,7 +240,7 @@ public class CreateTableCompiler { } } - private static class ViewWhereExpressionVisitor extends StatelessTraverseNoExpressionVisitor<Boolean> { + public static class ViewWhereExpressionVisitor extends StatelessTraverseNoExpressionVisitor<Boolean> { private boolean isUpdatable = true; private final PTable table; private int position; @@ -318,13 +319,18 @@ public class CreateTableCompiler { @Override public Boolean visit(KeyValueColumnExpression node) { try { - this.position = table.getColumnFamily(node.getColumnFamily()).getColumn(node.getColumnName()).getPosition(); + this.position = table.getColumnFamily(node.getColumnFamily()).getPColumnForColumnQualifier(node.getColumnQualifier()).getPosition(); } catch (SQLException e) { throw new RuntimeException(e); // Impossible } return Boolean.TRUE; } + @Override + public Boolean visit(SingleCellColumnExpression node) { + return visit(node.getKeyValueExpression()); + } + } private static class VarbinaryDatum implements PDatum { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 602cd6b..cee545a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -585,7 +585,7 @@ public class DeleteCompiler { if (ptr.getLength() > 0) { byte[] uuidValue = ServerCacheClient.generateId(); context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - context.getScan().setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get()); + context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } ResultIterator iterator = aggPlan.iterator(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java index c05918b..fb4c542 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java @@ -117,12 +117,26 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TypeMismatchException; -import org.apache.phoenix.schema.types.*; +import org.apache.phoenix.schema.types.PArrayDataType; +import org.apache.phoenix.schema.types.PBoolean; +import org.apache.phoenix.schema.types.PChar; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PDate; +import org.apache.phoenix.schema.types.PDecimal; +import org.apache.phoenix.schema.types.PDouble; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PTimestamp; +import org.apache.phoenix.schema.types.PUnsignedTimestamp; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.schema.types.PhoenixArray; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; @@ -386,7 +400,7 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio } protected void addColumn(PColumn column) { - context.getScan().addColumn(column.getFamilyName().getBytes(), column.getName().getBytes()); + EncodedColumnsUtil.setColumns(column, context.getCurrentTable().getTable(), context.getScan()); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index 8e4d9aa..8ba0e12 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; + import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; @@ -28,6 +30,7 @@ import java.util.Map; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.expression.Expression; @@ -71,6 +74,8 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; @@ -125,10 +130,12 @@ public class FromCompiler { throw new ColumnNotFoundException(schemaName, tableName, null, colName); } + @Override public PFunction resolveFunction(String functionName) throws SQLException { throw new FunctionNotFoundException(functionName); } + @Override public boolean hasUDFs() { return false; } @@ -257,7 +264,7 @@ public class FromCompiler { Expression sourceExpression = projector.getColumnProjector(column.getPosition()).getExpression(); PColumnImpl projectedColumn = new PColumnImpl(column.getName(), column.getFamilyName(), sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(), - column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic()); + column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic(), column.getColumnQualifierBytes()); projectedColumns.add(projectedColumn); } PTable t = PTableImpl.makePTable(table, projectedColumns); @@ -332,26 +339,28 @@ public class FromCompiler { private final String alias; private final List<PSchema> schemas; - public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp, Map<String, UDFParseNode> udfParseNodes, boolean isNamespaceMapped) throws SQLException { - super(connection, 0, false, udfParseNodes); - List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size()); - for (ColumnDef def : table.getDynamicColumns()) { - if (def.getColumnDefName().getFamilyName() != null) { - families.add(new PColumnFamilyImpl(PNameFactory.newName(def.getColumnDefName().getFamilyName()),Collections.<PColumn>emptyList())); - } + public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp, Map<String, UDFParseNode> udfParseNodes, boolean isNamespaceMapped) throws SQLException { + super(connection, 0, false, udfParseNodes); + List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size()); + for (ColumnDef def : table.getDynamicColumns()) { + if (def.getColumnDefName().getFamilyName() != null) { + families.add(new PColumnFamilyImpl(PNameFactory.newName(def.getColumnDefName().getFamilyName()),Collections.<PColumn>emptyList()));//, NON_ENCODED_QUALIFIERS)); + } } Long scn = connection.getSCN(); String schema = table.getName().getSchemaName(); if (connection.getSchema() != null) { schema = schema != null ? schema : connection.getSchema(); } - PTable theTable = new PTableImpl(connection.getTenantId(), schema, table.getName().getTableName(), + // Storage scheme and encoding scheme don't matter here since the PTable is being used only for the purposes of create table. + // The actual values of these two will be determined by the metadata client. + PTable theTable = new PTableImpl(connection.getTenantId(), schema, table.getName().getTableName(), scn == null ? HConstants.LATEST_TIMESTAMP : scn, families, isNamespaceMapped); - theTable = this.addDynamicColumns(table.getDynamicColumns(), theTable); - alias = null; - tableRefs = ImmutableList.of(new TableRef(alias, theTable, timeStamp, !table.getDynamicColumns().isEmpty())); - schemas = ImmutableList.of(new PSchema(theTable.getSchemaName().toString(), timeStamp)); - } + theTable = this.addDynamicColumns(table.getDynamicColumns(), theTable); + alias = null; + tableRefs = ImmutableList.of(new TableRef(alias, theTable, timeStamp, !table.getDynamicColumns().isEmpty())); + schemas = ImmutableList.of(new PSchema(theTable.getSchemaName().toString(), timeStamp)); + } public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException { this(connection, tableNode, updateCacheImmediately, 0, new HashMap<String,UDFParseNode>(1)); @@ -447,8 +456,8 @@ public class FromCompiler { } PColumn column = resolveCF - ? tableRef.getTable().getColumnFamily(tableName).getColumn(colName) - : tableRef.getTable().getColumn(colName); + ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName) + : tableRef.getTable().getPColumnForColumnName(colName); return new ColumnRef(tableRef, column.getPosition()); } @@ -672,7 +681,7 @@ public class FromCompiler { familyName = PNameFactory.newName(family); } allcolumns.add(new PColumnImpl(name, familyName, dynColumn.getDataType(), dynColumn.getMaxLength(), - dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false, true)); + dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false, true, Bytes.toBytes(dynColumn.getColumnDefName().getColumnName()))); position++; } theTable = PTableImpl.makePTable(theTable, allcolumns); @@ -774,16 +783,17 @@ public class FromCompiler { // referenced by an outer wild-card select. alias = String.valueOf(position); } + PName name = PNameFactory.newName(alias); PColumnImpl column = new PColumnImpl(PNameFactory.newName(alias), PNameFactory.newName(QueryConstants.DEFAULT_COLUMN_FAMILY), - null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false, false); + null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false, false, name.getBytes()); columns.add(column); } PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null, Collections.<PTable> emptyList(), false, Collections.<PName> emptyList(), null, null, false, false, false, null, null, null, false, false, 0, 0L, SchemaUtil - .isNamespaceMappingEnabled(PTableType.SUBQUERY, connection.getQueryServices().getProps()), null, false); + .isNamespaceMappingEnabled(PTableType.SUBQUERY, connection.getQueryServices().getProps()), null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER); String alias = subselectNode.getAlias(); TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false); @@ -858,7 +868,7 @@ public class FromCompiler { while (iterator.hasNext()) { TableRef tableRef = iterator.next(); try { - PColumn column = tableRef.getTable().getColumn(colName); + PColumn column = tableRef.getTable().getPColumnForColumnName(colName); if (theTableRef != null) { throw new AmbiguousColumnException(colName); } theTableRef = tableRef; theColumnPosition = column.getPosition(); @@ -871,12 +881,12 @@ public class FromCompiler { } else { try { TableRef tableRef = resolveTable(schemaName, tableName); - PColumn column = tableRef.getTable().getColumn(colName); + PColumn column = tableRef.getTable().getPColumnForColumnName(colName); return new ColumnRef(tableRef, column.getPosition()); } catch (TableNotFoundException e) { // Try using the tableName as a columnFamily reference instead ColumnFamilyRef cfRef = resolveColumnFamily(schemaName, tableName); - PColumn column = cfRef.getFamily().getColumn(colName); + PColumn column = cfRef.getFamily().getPColumnForColumnName(colName); return new ColumnRef(cfRef.getTableRef(), column.getPosition()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index e8c05ca..eef604b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; +import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN; + import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; @@ -76,6 +79,8 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ProjectedColumn; @@ -93,6 +98,7 @@ import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; @@ -714,7 +720,7 @@ public class JoinCompiler { if (columnRef.getTableRef().equals(tableRef) && !SchemaUtil.isPKColumn(columnRef.getColumn()) && !(columnRef instanceof LocalIndexColumnRef)) { - scan.addColumn(columnRef.getColumn().getFamilyName().getBytes(), columnRef.getColumn().getName().getBytes()); + EncodedColumnsUtil.setColumns(columnRef.getColumn(), tableRef.getTable(), scan); } } } @@ -1284,7 +1290,7 @@ public class JoinCompiler { if (type == JoinType.Full) { for (PColumn c : left.getColumns()) { merged.add(new ProjectedColumn(c.getName(), c.getFamilyName(), - c.getPosition(), true, ((ProjectedColumn) c).getSourceColumnRef())); + c.getPosition(), true, ((ProjectedColumn) c).getSourceColumnRef(), SchemaUtil.isPKColumn(c) ? null : c.getName().getBytes())); } } else { merged.addAll(left.getColumns()); @@ -1294,14 +1300,13 @@ public class JoinCompiler { if (!SchemaUtil.isPKColumn(c)) { PColumn column = new ProjectedColumn(c.getName(), c.getFamilyName(), position++, type == JoinType.Inner ? c.isNullable() : true, - ((ProjectedColumn) c).getSourceColumnRef()); + ((ProjectedColumn) c).getSourceColumnRef(), c.getName().getBytes()); merged.add(column); } } if (left.getBucketNum() != null) { merged.remove(0); } - return PTableImpl.makePTable(left.getTenantId(), left.getSchemaName(), PNameFactory.newName(SchemaUtil.getTableName(left.getName().getString(), right.getName().getString())), left.getType(), left.getIndexState(), left.getTimeStamp(), left.getSequenceNumber(), left.getPKName(), @@ -1310,7 +1315,7 @@ public class JoinCompiler { left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), left.getIndexType(), left.rowKeyOrderOptimizable(), left.isTransactional(), left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp(), left.isNamespaceMapped(), - left.getAutoPartitionSeqName(), left.isAppendOnlySchema()); + left.getAutoPartitionSeqName(), left.isAppendOnlySchema(), ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java index 2df0671..8265de8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java @@ -57,6 +57,7 @@ import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.SortOrder; @@ -77,10 +78,11 @@ public class ListJarsQueryPlan implements QueryPlan { static { List<ExpressionProjector> projectedColumns = new ArrayList<ExpressionProjector>(); + PName colName = PNameFactory.newName("jar_location"); PColumn column = - new PColumnImpl(PNameFactory.newName("jar_location"), null, + new PColumnImpl(colName, null, PVarchar.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, - false, null, false, false); + false, null, false, false, colName.getBytes()); List<PColumn> columns = new ArrayList<PColumn>(); columns.add(column); Expression expression = http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index 393499a..e4ef25f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -50,6 +50,7 @@ import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.TransactionUtil; @@ -176,8 +177,8 @@ public class PostDDLCompiler { @Override public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException { PColumn column = tableName != null - ? tableRef.getTable().getColumnFamily(tableName).getColumn(colName) - : tableRef.getTable().getColumn(colName); + ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName) + : tableRef.getTable().getPColumnForColumnName(colName); return new ColumnRef(tableRef, column.getPosition()); } @@ -213,6 +214,7 @@ public class PostDDLCompiler { ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts); if (emptyCF != null) { scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst()); } ServerCache cache = null; try { @@ -236,11 +238,12 @@ public class PostDDLCompiler { // data empty column family to stay the same, while the index empty column family // changes. PColumn column = deleteList.get(0); + byte[] cq = column.getColumnQualifierBytes(); if (emptyCF == null) { - scan.addColumn(column.getFamilyName().getBytes(), column.getName().getBytes()); + scan.addColumn(column.getFamilyName().getBytes(), cq); } scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes()); - scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, column.getName().getBytes()); + scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq); } } List<byte[]> columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java index 81dbe0d..7e3c3b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java @@ -31,6 +31,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; @@ -77,12 +78,16 @@ public class PostLocalIndexDDLCompiler { // rows per region as a result. The value of the attribute will be our persisted // index maintainers. // Define the LOCAL_INDEX_BUILD as a new static in BaseScannerRegionObserver - scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, ByteUtil.copyKeyBytesIfNecessary(ptr)); // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*). // However, in this case, we need to project all of the data columns that contribute to the index. IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection); for (ColumnReference columnRef : indexMaintainer.getAllColumns()) { - scan.addColumn(columnRef.getFamily(), columnRef.getQualifier()); + if (index.getImmutableStorageScheme() == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { + scan.addFamily(columnRef.getFamily()); + } else { + scan.addColumn(columnRef.getFamily(), columnRef.getQualifier()); + } } // Go through MutationPlan abstraction so that we can create local indexes http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java index 99a9731..eef1ada 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java @@ -24,11 +24,9 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; -import java.util.Set; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -44,14 +42,11 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.ProjectedColumnExpression; -import org.apache.phoenix.expression.aggregator.ClientAggregators; -import org.apache.phoenix.expression.aggregator.ServerAggregators; +import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.function.ArrayIndexFunction; -import org.apache.phoenix.expression.function.SingleAggregateFunction; import org.apache.phoenix.expression.visitor.ExpressionVisitor; import org.apache.phoenix.expression.visitor.ProjectedColumnExpressionVisitor; import org.apache.phoenix.expression.visitor.ReplaceArrayFunctionExpressionVisitor; -import org.apache.phoenix.expression.visitor.SingleAggregateFunctionVisitor; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.BindParseNode; @@ -78,6 +73,7 @@ import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; @@ -92,9 +88,7 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SizedUtil; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; /** @@ -217,7 +211,7 @@ public class ProjectionCompiler { PColumn indexColumn = null; ColumnRef ref = null; try { - indexColumn = index.getColumn(indexColName); + indexColumn = index.getPColumnForColumnName(indexColName); ref = new ColumnRef(tableRef, indexColumn.getPosition()); } catch (ColumnNotFoundException e) { if (index.getIndexType() == IndexType.LOCAL) { @@ -289,7 +283,7 @@ public class ProjectionCompiler { ColumnRef ref = null; String indexColumnFamily = null; try { - indexColumn = index.getColumn(indexColName); + indexColumn = index.getPColumnForColumnName(indexColName); ref = new ColumnRef(tableRef, indexColumn.getPosition()); indexColumnFamily = indexColumn.getFamilyName() == null ? null : indexColumn.getFamilyName().getString(); } catch (ColumnNotFoundException e) { @@ -484,11 +478,13 @@ public class ProjectionCompiler { } } else { for (byte[] cq : entry.getValue()) { - PColumn column = family.getColumn(cq); - Integer maxLength = column.getMaxLength(); - int byteSize = column.getDataType().isFixedWidth() ? maxLength == null ? column.getDataType().getByteSize() : maxLength : RowKeySchema.ESTIMATED_VARIABLE_LENGTH_SIZE; - estimatedByteSize += SizedUtil.KEY_VALUE_SIZE + estimatedKeySize + byteSize; - } + //if (!Bytes.equals(cq, ByteUtil.EMPTY_BYTE_ARRAY) || cq.length > 0) { + PColumn column = family.getPColumnForColumnQualifier(cq); + Integer maxLength = column.getMaxLength(); + int byteSize = column.getDataType().isFixedWidth() ? maxLength == null ? column.getDataType().getByteSize() : maxLength : RowKeySchema.ESTIMATED_VARIABLE_LENGTH_SIZE; + estimatedByteSize += SizedUtil.KEY_VALUE_SIZE + estimatedKeySize + byteSize; + } + //} } } boolean isProjectEmptyKeyValue = false; @@ -663,7 +659,14 @@ public class ProjectionCompiler { public Void visit(ProjectedColumnExpression expression) { if (expression.getDataType().isArrayType()) { indexProjectedColumns.add(expression); - KeyValueColumnExpression keyValueColumnExpression = new KeyValueColumnExpression(expression.getColumn()); + PColumn col = expression.getColumn(); + PTable table = context.getCurrentTable().getTable(); + KeyValueColumnExpression keyValueColumnExpression; + if (table.getImmutableStorageScheme() != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) { + keyValueColumnExpression = new SingleCellColumnExpression(col, col.getName().getString(), table.getEncodingScheme()); + } else { + keyValueColumnExpression = new KeyValueColumnExpression(col); + } indexKVs.add(keyValueColumnExpression); copyOfChildren.set(0, keyValueColumnExpression); Integer count = arrayExpressionCounts.get(expression); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 2258f28..5126c8b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -94,7 +94,7 @@ public class QueryCompiler { */ private static final String LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR = "_ondemand_"; private final PhoenixStatement statement; - private final Scan scan; + private final Scan scan; private final Scan originalScan; private final ColumnResolver resolver; private final SelectStatement select; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index ed5cda9..8fb435d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@ -51,6 +51,7 @@ import org.apache.phoenix.parse.TraceStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.SortOrder; @@ -75,10 +76,11 @@ public class TraceQueryPlan implements QueryPlan { private static final RowProjector TRACE_PROJECTOR; static { List<ExpressionProjector> projectedColumns = new ArrayList<ExpressionProjector>(); + PName colName = PNameFactory.newName(MetricInfo.TRACE.columnName); PColumn column = new PColumnImpl(PNameFactory.newName(MetricInfo.TRACE.columnName), null, PLong.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, - false, null, false, false); + false, null, false, false, colName.getBytes()); List<PColumn> columns = new ArrayList<PColumn>(); columns.add(column); Expression expression =