Repository: phoenix
Updated Branches:
  refs/heads/txn b5e9396de -> 8911fdbcc


Added TransactionIT


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8911fdbc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8911fdbc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8911fdbc

Branch: refs/heads/txn
Commit: 8911fdbccac60a191add0a7d190bb574846b7470
Parents: b5e9396
Author: Thomas D'Silva <twdsi...@gmail.com>
Authored: Thu Mar 12 17:57:57 2015 -0700
Committer: Thomas D'Silva <twdsi...@gmail.com>
Committed: Thu Mar 12 17:57:57 2015 -0700

----------------------------------------------------------------------
 .../phoenix/transactions/TransactionIT.java     | 170 +++++++++++++++++++
 1 file changed, 170 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8911fdbc/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
new file mode 100644
index 0000000..aaa2cb6
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.transactions;
+
+import static org.junit.Assert.*;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import co.cask.tephra.TransactionManager;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.distributed.TransactionService;
+import co.cask.tephra.metrics.TxMetricsCollector;
+import co.cask.tephra.persist.InMemoryTransactionStateStorage;
+
+public class TransactionIT extends BaseHBaseManagedTimeIT {
+
+       @BeforeClass
+       public static void setupBeforeClass() throws Exception {
+               config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+//             config.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, 
ConnectionInfo.getZookeeperConnectionString(getUrl()));
+               
config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+               config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 
1);
+
+               ConnectionInfo connInfo = ConnectionInfo.create(getUrl());
+           ZKClientService zkClient = ZKClientServices.delegate(
+             ZKClients.reWatchOnExpire(
+               ZKClients.retryOnFailure(
+                 
ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
+                   
.setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
+                               HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+                   .build(),
+                 RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS)
+               )
+             )
+           );
+           zkClient.startAndWait();
+
+           DiscoveryService discovery = new ZKDiscoveryService(zkClient);
+           final TransactionManager txManager = new TransactionManager(config, 
new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+           TransactionService txService = new TransactionService(config, 
zkClient, discovery, txManager);
+           txService.startAndWait();
+       }
+       
+       @Test
+       public void testUpsert() throws Exception {
+               Connection conn = DriverManager.getConnection(getUrl());
+               String ddl = "CREATE TABLE t (k1 INTEGER PRIMARY KEY, k2 
INTEGER) transactional=true";
+               try {
+                       conn.setAutoCommit(false);
+                       conn.createStatement().execute(ddl);
+                       // upsert one row
+                       PreparedStatement stmt = conn.prepareStatement("UPSERT 
INTO t VALUES(?,?)");
+               stmt.setInt(1, 1);
+               stmt.setInt(2, 1);
+               stmt.execute();
+               conn.commit();
+               // verify row exists
+               ResultSet rs = conn.createStatement().executeQuery("SELECT * 
FROM t");
+               assertTrue(rs.next());
+               assertEquals(1,rs.getInt(1));
+               assertEquals(1,rs.getInt(1));
+               assertFalse(rs.next());
+               }
+        finally {
+               conn.close();
+        }
+       }
+       
+       @Test
+       public void testColConflicts() throws Exception {
+               Connection conn1 = DriverManager.getConnection(getUrl());
+               Connection conn2 = DriverManager.getConnection(getUrl());
+               String ddl = "CREATE TABLE t (k1 INTEGER PRIMARY KEY, k2 
INTEGER) transactional=true";
+               try {
+                       conn1.setAutoCommit(false);
+                       conn2.setAutoCommit(false);
+                       conn1.createStatement().execute(ddl);
+                       // upsert row using conn1
+                       PreparedStatement stmt = conn1.prepareStatement("UPSERT 
INTO t VALUES(?,?)");
+               stmt.setInt(1, 1);
+               stmt.setInt(2, 10);
+               stmt.execute();
+               // upsert row using conn2
+                       stmt = conn2.prepareStatement("UPSERT INTO t 
VALUES(?,?)");
+               stmt.setInt(1, 1);
+               stmt.setInt(2, 11);
+               stmt.execute();
+               
+               conn1.commit();
+               //second commit should fail
+               try {
+                       conn2.commit();
+                       fail();
+               }       
+               catch (SQLException e) {
+                       assertEquals(e.getErrorCode(), 
SQLExceptionCode.TRANSACTION_FINISH_EXCEPTION.getErrorCode());
+               }
+               }
+        finally {
+               conn1.close();
+        }
+       }
+       
+       @Test
+       public void testRowConflicts() throws Exception {
+               Connection conn1 = DriverManager.getConnection(getUrl());
+               Connection conn2 = DriverManager.getConnection(getUrl());
+               String ddl = "CREATE TABLE t (k1 INTEGER PRIMARY KEY, k2 
INTEGER, k3 INTEGER) transactional=true";
+               try {
+                       conn1.setAutoCommit(false);
+                       conn2.setAutoCommit(false);
+                       conn1.createStatement().execute(ddl);
+                       // upsert row using conn1
+                       PreparedStatement stmt = conn1.prepareStatement("UPSERT 
INTO t(k1,k2) VALUES(?,?)");
+               stmt.setInt(1, 1);
+               stmt.setInt(2, 10);
+               stmt.execute();
+               // upsert row using conn2
+                       stmt = conn2.prepareStatement("UPSERT INTO t(k1,k3) 
VALUES(?,?)");
+               stmt.setInt(1, 1);
+               stmt.setInt(2, 11);
+               stmt.execute();
+               
+               conn1.commit();
+               //second commit should fail
+               try {
+                       conn2.commit();
+                       fail();
+               }       
+               catch (SQLException e) {
+                       assertEquals(e.getErrorCode(), 
SQLExceptionCode.TRANSACTION_FINISH_EXCEPTION.getErrorCode());
+               }
+               }
+        finally {
+               conn1.close();
+        }
+       }
+
+
+//     @AfterClass
+//     public static void shutdownAfterClass() throws Exception {
+//             txManager.stop();
+//     }
+
+}

Reply via email to