Repository: phoenix
Updated Branches:
  refs/heads/master 763a3566e -> 04504c34f


PHOENIX-2600 NPE on immutable index creation over transactional table


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/04504c34
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/04504c34
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/04504c34

Branch: refs/heads/master
Commit: 04504c34ffb2e39f38e1b37ee0d7f8f909537616
Parents: 763a356
Author: James Taylor <jamestay...@apache.org>
Authored: Fri Jan 15 18:04:07 2016 -0800
Committer: James Taylor <jamestay...@apache.org>
Committed: Fri Jan 15 18:04:07 2016 -0800

----------------------------------------------------------------------
 .../apache/phoenix/end2end/UpsertSelectIT.java  |  23 +
 .../org/apache/phoenix/tx/TransactionIT.java    | 442 ++++++++++---------
 .../apache/phoenix/compile/DeleteCompiler.java  |   1 -
 .../apache/phoenix/execute/MutationState.java   |  40 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   1 +
 5 files changed, 281 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/04504c34/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index b5252e0..364b423 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -1373,6 +1373,29 @@ public class UpsertSelectIT extends 
BaseClientManagedTimeIT {
         assertEquals("[[128,0,0,54], [128,0,4,0]]", rs.getArray(2).toString());
     }
 
+
+    @Test
+    public void testParallelUpsertSelect() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, 
Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, 
Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, 
Integer.toString(3));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        conn.createStatement().execute("CREATE SEQUENCE S1");
+        conn.createStatement().execute("CREATE TABLE SALTEDT1 (pk INTEGER 
PRIMARY KEY, val INTEGER) SALT_BUCKETS=4");
+        conn.createStatement().execute("CREATE TABLE T2 (pk INTEGER PRIMARY 
KEY, val INTEGER)");
+
+        for (int i = 0; i < 100; i++) {
+            conn.createStatement().execute("UPSERT INTO SALTEDT1 VALUES (NEXT 
VALUE FOR S1, " + (i%10) + ")");
+        }
+        conn.commit();
+        conn.setAutoCommit(true);
+        int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO T2 
SELECT pk, val FROM SALTEDT1");
+        assertEquals(100,upsertCount);
+        conn.close();
+    }
+
     private static Connection getConnection(long ts) throws SQLException {
         Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/04504c34/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index e08225c..2794c47 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -36,6 +36,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase11.TransactionAwareHTable;
+import co.cask.tephra.hbase11.coprocessor.TransactionProcessor;
+
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
@@ -68,16 +74,10 @@ import org.junit.Test;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase11.TransactionAwareHTable;
-import co.cask.tephra.hbase11.coprocessor.TransactionProcessor;
-
 public class TransactionIT extends BaseHBaseManagedTimeIT {
-       
-       private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
-       
+    
+    private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
+    
     @Before
     public void setUp() throws SQLException {
         ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE);
@@ -90,73 +90,73 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
-               
-       @Test
-       public void testReadOwnWrites() throws Exception {
-               String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
-               try (Connection conn = DriverManager.getConnection(getUrl())) {
-                       conn.setAutoCommit(false);
-                       ResultSet rs = 
conn.createStatement().executeQuery(selectSql);
-               assertFalse(rs.next());
-               
-               String upsert = "UPSERT INTO " + FULL_TABLE_NAME + 
"(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, 
?, ?)";
-               PreparedStatement stmt = conn.prepareStatement(upsert);
-                       // upsert two rows
-                       TestUtil.setRowKeyColumns(stmt, 1);
-                       stmt.execute();
-                       TestUtil.setRowKeyColumns(stmt, 2);
-                       stmt.execute();
-               
-               // verify rows can be read even though commit has not been 
called
-                       rs = conn.createStatement().executeQuery(selectSql);
-                       TestUtil.validateRowKeyColumns(rs, 1);
-                       TestUtil.validateRowKeyColumns(rs, 2);
-               assertFalse(rs.next());
-               
-               conn.commit();
-               
-               // verify rows can be read after commit
-               rs = conn.createStatement().executeQuery(selectSql);
-               TestUtil.validateRowKeyColumns(rs, 1);
-               TestUtil.validateRowKeyColumns(rs, 2);
-               assertFalse(rs.next());
-               }
-       }
-       
-       @Test
-       public void testTxnClosedCorrecty() throws Exception {
-               String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
-               try (Connection conn = DriverManager.getConnection(getUrl())) {
-                       conn.setAutoCommit(false);
-                       ResultSet rs = 
conn.createStatement().executeQuery(selectSql);
-               assertFalse(rs.next());
-               
-               String upsert = "UPSERT INTO " + FULL_TABLE_NAME + 
"(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, 
?, ?)";
-               PreparedStatement stmt = conn.prepareStatement(upsert);
-                       // upsert two rows
-                       TestUtil.setRowKeyColumns(stmt, 1);
-                       stmt.execute();
-                       TestUtil.setRowKeyColumns(stmt, 2);
-                       stmt.execute();
-               
-               // verify rows can be read even though commit has not been 
called
-                       rs = conn.createStatement().executeQuery(selectSql);
-                       TestUtil.validateRowKeyColumns(rs, 1);
-                       TestUtil.validateRowKeyColumns(rs, 2);
-               assertFalse(rs.next());
-               
-               conn.close();
-               // wait for any open txns to time out
-               Thread.sleep(DEFAULT_TXN_TIMEOUT_SECONDS*1000+10000);
-               assertTrue("There should be no invalid transactions", 
txManager.getInvalidSize()==0);
-               }
-       }
-       
+        
+    @Test
+    public void testReadOwnWrites() throws Exception {
+        String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            TestUtil.setRowKeyColumns(stmt, 2);
+            stmt.execute();
+            
+            // verify rows can be read even though commit has not been called
+            rs = conn.createStatement().executeQuery(selectSql);
+            TestUtil.validateRowKeyColumns(rs, 1);
+            TestUtil.validateRowKeyColumns(rs, 2);
+            assertFalse(rs.next());
+            
+            conn.commit();
+            
+            // verify rows can be read after commit
+            rs = conn.createStatement().executeQuery(selectSql);
+            TestUtil.validateRowKeyColumns(rs, 1);
+            TestUtil.validateRowKeyColumns(rs, 2);
+            assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testTxnClosedCorrecty() throws Exception {
+        String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            TestUtil.setRowKeyColumns(stmt, 2);
+            stmt.execute();
+            
+            // verify rows can be read even though commit has not been called
+            rs = conn.createStatement().executeQuery(selectSql);
+            TestUtil.validateRowKeyColumns(rs, 1);
+            TestUtil.validateRowKeyColumns(rs, 2);
+            assertFalse(rs.next());
+            
+            conn.close();
+            // wait for any open txns to time out
+            Thread.sleep(DEFAULT_TXN_TIMEOUT_SECONDS*1000+10000);
+            assertTrue("There should be no invalid transactions", 
txManager.getInvalidSize()==0);
+        }
+    }
+    
     @Test
     public void testDelete() throws Exception {
         String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;
         try (Connection conn1 = DriverManager.getConnection(getUrl()); 
-                       Connection conn2 = 
DriverManager.getConnection(getUrl())) {
+                Connection conn2 = DriverManager.getConnection(getUrl())) {
             conn1.setAutoCommit(false);
             ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
             assertFalse(rs.next());
@@ -188,19 +188,19 @@ public class TransactionIT extends BaseHBaseManagedTimeIT 
{
         }
     }
     
-       @Test
-       public void testAutoCommitQuerySingleTable() throws Exception {
-               try (Connection conn = DriverManager.getConnection(getUrl())) {
-                       conn.setAutoCommit(true);
-                       // verify no rows returned
-                       ResultSet rs = 
conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME);
-                       assertFalse(rs.next());
-               }
-       }
-       
+    @Test
+    public void testAutoCommitQuerySingleTable() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            // verify no rows returned
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
" + FULL_TABLE_NAME);
+            assertFalse(rs.next());
+        }
+    }
+    
     @Test
     public void testAutoCommitQueryMultiTables() throws Exception {
-       try (Connection conn = DriverManager.getConnection(getUrl())) {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.setAutoCommit(true);
             // verify no rows returned
             ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
" + FULL_TABLE_NAME + " a JOIN " + FULL_TABLE_NAME + " b ON (a.long_pk = 
b.int_pk)");
@@ -208,87 +208,87 @@ public class TransactionIT extends BaseHBaseManagedTimeIT 
{
         } 
     }
     
-       @Test
-       public void testColConflicts() throws Exception {
-               try (Connection conn1 = DriverManager.getConnection(getUrl()); 
-                       Connection conn2 = 
DriverManager.getConnection(getUrl())) {
-                       conn1.setAutoCommit(false);
-                       conn2.setAutoCommit(false);
-                       String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
-                       conn1.setAutoCommit(false);
-                       ResultSet rs = 
conn1.createStatement().executeQuery(selectSql);
-               assertFalse(rs.next());
-                       // upsert row using conn1
-                       String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + 
"(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) 
VALUES(?, ?, ?, ?, ?, ?, ?)";
-                       PreparedStatement stmt = 
conn1.prepareStatement(upsertSql);
-                       TestUtil.setRowKeyColumns(stmt, 1);
-                       stmt.setInt(7, 10);
-               stmt.execute();
-               // upsert row using conn2
-                       stmt = conn2.prepareStatement(upsertSql);
-                       TestUtil.setRowKeyColumns(stmt, 1);
-                       stmt.setInt(7, 11);
-               stmt.execute();
-               
-               conn1.commit();
-               //second commit should fail
-               try {
-                       conn2.commit();
-                       fail();
-               }       
-               catch (SQLException e) {
-                       assertEquals(e.getErrorCode(), 
SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
-               }
-               }
-       }
-       
-       private void testRowConflicts() throws Exception {
-               try (Connection conn1 = DriverManager.getConnection(getUrl()); 
-                       Connection conn2 = 
DriverManager.getConnection(getUrl())) {
-                       conn1.setAutoCommit(false);
-                       conn2.setAutoCommit(false);
-                       String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
-                       conn1.setAutoCommit(false);
-                       ResultSet rs = 
conn1.createStatement().executeQuery(selectSql);
-                       boolean immutableRows = 
conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, 
FULL_TABLE_NAME)).isImmutableRows();
-               assertFalse(rs.next());
-                       // upsert row using conn1
-                       String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + 
"(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) 
VALUES(?, ?, ?, ?, ?, ?, ?)";
-                       PreparedStatement stmt = 
conn1.prepareStatement(upsertSql);
-                       TestUtil.setRowKeyColumns(stmt, 1);
-                       stmt.setInt(7, 10);
-               stmt.execute();
-               // upsert row using conn2
-               upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, 
?, ?, ?)";
-                       stmt = conn2.prepareStatement(upsertSql);
-                       TestUtil.setRowKeyColumns(stmt, 1);
-                       stmt.setInt(7, 11);
-               stmt.execute();
-               
-               conn1.commit();
-               //second commit should fail
-               try {
-                       conn2.commit();
-                       if (!immutableRows) fail();
-               }       
-               catch (SQLException e) {
-                       if (immutableRows) fail();
-                       assertEquals(e.getErrorCode(), 
SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
-               }
-               }
-       }
-       
-       @Test
-       public void testRowConflictDetected() throws Exception {
-               testRowConflicts();
-       }
-       
-       @Test
-       public void testNoConflictDetectionForImmutableRows() throws Exception {
-               Connection conn = DriverManager.getConnection(getUrl());
-               conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME 
+ " SET IMMUTABLE_ROWS=true");
-               testRowConflicts();
-       }
+    @Test
+    public void testColConflicts() throws Exception {
+        try (Connection conn1 = DriverManager.getConnection(getUrl()); 
+                Connection conn2 = DriverManager.getConnection(getUrl())) {
+            conn1.setAutoCommit(false);
+            conn2.setAutoCommit(false);
+            String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+            conn1.setAutoCommit(false);
+            ResultSet rs = conn1.createStatement().executeQuery(selectSql);
+            assertFalse(rs.next());
+            // upsert row using conn1
+            String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + 
"(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) 
VALUES(?, ?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn1.prepareStatement(upsertSql);
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.setInt(7, 10);
+            stmt.execute();
+            // upsert row using conn2
+            stmt = conn2.prepareStatement(upsertSql);
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.setInt(7, 11);
+            stmt.execute();
+            
+            conn1.commit();
+            //second commit should fail
+            try {
+                conn2.commit();
+                fail();
+            }   
+            catch (SQLException e) {
+                assertEquals(e.getErrorCode(), 
SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
+            }
+        }
+    }
+    
+    private void testRowConflicts() throws Exception {
+        try (Connection conn1 = DriverManager.getConnection(getUrl()); 
+                Connection conn2 = DriverManager.getConnection(getUrl())) {
+            conn1.setAutoCommit(false);
+            conn2.setAutoCommit(false);
+            String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+            conn1.setAutoCommit(false);
+            ResultSet rs = conn1.createStatement().executeQuery(selectSql);
+            boolean immutableRows = 
conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, 
FULL_TABLE_NAME)).isImmutableRows();
+            assertFalse(rs.next());
+            // upsert row using conn1
+            String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + 
"(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) 
VALUES(?, ?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn1.prepareStatement(upsertSql);
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.setInt(7, 10);
+            stmt.execute();
+            // upsert row using conn2
+            upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, 
?, ?, ?)";
+            stmt = conn2.prepareStatement(upsertSql);
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.setInt(7, 11);
+            stmt.execute();
+            
+            conn1.commit();
+            //second commit should fail
+            try {
+                conn2.commit();
+                if (!immutableRows) fail();
+            }   
+            catch (SQLException e) {
+                if (immutableRows) fail();
+                assertEquals(e.getErrorCode(), 
SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
+            }
+        }
+    }
+    
+    @Test
+    public void testRowConflictDetected() throws Exception {
+        testRowConflicts();
+    }
+    
+    @Test
+    public void testNoConflictDetectionForImmutableRows() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " 
SET IMMUTABLE_ROWS=true");
+        testRowConflicts();
+    }
     
     @Test
     public void testNonTxToTxTable() throws Exception {
@@ -514,33 +514,33 @@ public class TransactionIT extends BaseHBaseManagedTimeIT 
{
     }
 
     public void testCurrentDate() throws Exception {
-               String selectSql = "SELECT current_date() FROM 
"+FULL_TABLE_NAME;
-               try (Connection conn = DriverManager.getConnection(getUrl())) {
-                       conn.setAutoCommit(false);
-                       ResultSet rs = 
conn.createStatement().executeQuery(selectSql);
-               assertFalse(rs.next());
-               
-               String upsert = "UPSERT INTO " + FULL_TABLE_NAME + 
"(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, 
?, ?)";
-               PreparedStatement stmt = conn.prepareStatement(upsert);
-                       // upsert two rows
-                       TestUtil.setRowKeyColumns(stmt, 1);
-                       stmt.execute();
-                       conn.commit();
-                       
-                       rs = conn.createStatement().executeQuery(selectSql);
-                       assertTrue(rs.next());
-                       Date date1 = rs.getDate(1);
-               assertFalse(rs.next());
-               
-               Thread.sleep(1000);
-               
-               rs = conn.createStatement().executeQuery(selectSql);
-                       assertTrue(rs.next());
-                       Date date2 = rs.getDate(1);
-               assertFalse(rs.next());
-               assertTrue("current_date() should change while executing 
multiple statements", date2.getTime() > date1.getTime());
-               }
-       }
+        String selectSql = "SELECT current_date() FROM "+FULL_TABLE_NAME;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            conn.commit();
+            
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            Date date1 = rs.getDate(1);
+            assertFalse(rs.next());
+            
+            Thread.sleep(1000);
+            
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            Date date2 = rs.getDate(1);
+            assertFalse(rs.next());
+            assertTrue("current_date() should change while executing multiple 
statements", date2.getTime() > date1.getTime());
+        }
+    }
     
     @Test
     public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws 
SQLException {
@@ -558,32 +558,32 @@ public class TransactionIT extends BaseHBaseManagedTimeIT 
{
     
     @Test
     public void testRowTimestampDisabled() throws SQLException {
-       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-               conn.setAutoCommit(false);
-               Statement stmt = conn.createStatement();
-               try {
-                       stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d 
DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP)) 
TRANSACTIONAL=true");
-                       fail();
-               }
-               catch(SQLException e) {
-                       
assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(),
 e.getErrorCode());
-               }
-               stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d DATE 
NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP))");
-               try {
-                       stmt.execute("ALTER TABLE DEMO SET TRANSACTIONAL=true");
-                       fail();
-               }
-               catch(SQLException e) {
-                       
assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(),
 e.getErrorCode());
-               }
+            conn.setAutoCommit(false);
+            Statement stmt = conn.createStatement();
+            try {
+                stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d DATE 
NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true");
+                fail();
+            }
+            catch(SQLException e) {
+                
assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(),
 e.getErrorCode());
+            }
+            stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d DATE NOT 
NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP))");
+            try {
+                stmt.execute("ALTER TABLE DEMO SET TRANSACTIONAL=true");
+                fail();
+            }
+            catch(SQLException e) {
+                
assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(),
 e.getErrorCode());
+            }
         }
     }
     
     @Test
     public void testReadOnlyView() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
-               String ddl = "CREATE TABLE t (k INTEGER NOT NULL PRIMARY KEY, 
v1 DATE) TRANSACTIONAL=true";
+        String ddl = "CREATE TABLE t (k INTEGER NOT NULL PRIMARY KEY, v1 DATE) 
TRANSACTIONAL=true";
         conn.createStatement().execute(ddl);
         ddl = "CREATE VIEW v (v2 VARCHAR) AS SELECT * FROM t where k>4";
         conn.createStatement().execute(ddl);
@@ -870,4 +870,26 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
             assertFalse(rs.next());
         }
     }
+
+    @Test
+    public void testParallelUpsertSelect() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, 
Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, 
Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, 
Integer.toString(3));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        conn.createStatement().execute("CREATE SEQUENCE S1");
+        conn.createStatement().execute("CREATE TABLE SALTEDT1 (pk INTEGER 
PRIMARY KEY, val INTEGER) SALT_BUCKETS=4,TRANSACTIONAL=true");
+        conn.createStatement().execute("CREATE TABLE T2 (pk INTEGER PRIMARY 
KEY, val INTEGER) TRANSACTIONAL=true");
+
+        for (int i = 0; i < 100; i++) {
+            conn.createStatement().execute("UPSERT INTO SALTEDT1 VALUES (NEXT 
VALUE FOR S1, " + (i%10) + ")");
+        }
+        conn.commit();
+        conn.setAutoCommit(true);
+        int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO T2 
SELECT pk, val FROM SALTEDT1");
+        assertEquals(100,upsertCount);
+        conn.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/04504c34/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 924ed43..4c41f82 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -395,7 +395,6 @@ public class DeleteCompiler {
             break;
         }
         final QueryPlan dataPlan = dataPlanToBe;
-        final ColumnResolver resolver = resolverToBe;
         final boolean hasImmutableIndexes = !immutableIndex.isEmpty();
         // tableRefs is parallel with queryPlans
         TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? 
immutableIndex.size() : 1];

http://git-wip-us.apache.org/repos/asf/phoenix/blob/04504c34/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 41d677a..35a36e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -37,6 +37,18 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
+import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
+import co.cask.tephra.TransactionAware;
+import co.cask.tephra.TransactionCodec;
+import co.cask.tephra.TransactionConflictException;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.hbase11.TransactionAwareHTable;
+import co.cask.tephra.visibility.FenceWait;
+import co.cask.tephra.visibility.VisibilityFence;
+
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -98,18 +110,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import co.cask.tephra.Transaction;
-import co.cask.tephra.Transaction.VisibilityLevel;
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionCodec;
-import co.cask.tephra.TransactionConflictException;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.hbase11.TransactionAwareHTable;
-import co.cask.tephra.visibility.FenceWait;
-import co.cask.tephra.visibility.VisibilityFence;
-
 /**
  * 
  * Tracks the uncommitted state
@@ -242,7 +242,18 @@ public class MutationState implements SQLCloseable {
         }
     }
     
-    private void addReadFence(PTable dataTable) throws SQLException {
+    /**
+     * Add an entry to the change set representing the DML operation that is 
starting.
+     * These entries will not conflict with each other, but they will conflict 
with a
+     * DDL operation of creating an index. See {@link #addReadFence(PTable)} 
and TEPHRA-157
+     * for more information.
+     * @param dataTable the table which is doing DML
+     * @throws SQLException
+     */
+    public void addReadFence(PTable dataTable) throws SQLException {
+        if (this.txContext == null) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
+        }
         byte[] logicalKey = SchemaUtil.getTableKey(dataTable);
         this.txContext.addTransactionAware(VisibilityFence.create(logicalKey));
         byte[] physicalKey = dataTable.getPhysicalName().getBytes();
@@ -848,8 +859,7 @@ public class MutationState implements SQLCloseable {
                    final PTable table = tableRef.getTable();
                    // Track tables to which we've sent uncommitted data
                    if (isTransactional = table.isTransactional()) {
-                       addReadFence(table);
-                    txTableRefs.add(tableRef);
+                       txTableRefs.add(tableRef);
                        
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
                    }
                    boolean isDataTable = true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/04504c34/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 7e8969b..6bb5722 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -331,6 +331,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                                 MutationPlan plan = 
stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
                                 if (plan.getTargetRef() != null && 
plan.getTargetRef().getTable() != null && 
plan.getTargetRef().getTable().isTransactional()) {
                                     state.startTransaction();
+                                    
state.addReadFence(plan.getTargetRef().getTable());
                                 }
                                 Iterator<TableRef> tableRefs = 
plan.getSourceRefs().iterator();
                                 state.sendUncommitted(tableRefs);

Reply via email to