Repository: phoenix
Updated Branches:
  refs/heads/txn 8d5e3691b -> d1c2306da


Start txn on first stmt exec


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d1c2306d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d1c2306d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d1c2306d

Branch: refs/heads/txn
Commit: d1c2306daa6a3f93abeff0d5f8d177eb79ad5162
Parents: 8d5e369
Author: James Taylor <jtay...@salesforce.com>
Authored: Thu Mar 12 16:47:55 2015 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Thu Mar 12 16:47:55 2015 -0700

----------------------------------------------------------------------
 .../ConnectionQueryServicesTestImpl.java        |  2 +-
 .../apache/phoenix/execute/MutationState.java   | 71 +++++-----------
 .../phoenix/iterate/TableResultIterator.java    |  4 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  | 87 +++++++++++---------
 .../org/apache/phoenix/jdbc/PhoenixDriver.java  |  2 +-
 .../phoenix/jdbc/PhoenixEmbeddedDriver.java     |  5 --
 .../apache/phoenix/jdbc/PhoenixStatement.java   | 14 ++++
 .../phoenix/query/ConnectionQueryServices.java  |  4 +
 .../query/ConnectionQueryServicesImpl.java      | 36 ++++++++
 .../query/ConnectionlessQueryServicesImpl.java  | 35 +++++++-
 .../query/DelegateConnectionQueryServices.java  |  7 ++
 .../apache/phoenix/jdbc/PhoenixTestDriver.java  |  4 +-
 12 files changed, 166 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index bee8d21..2edab92 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -35,7 +35,7 @@ import org.apache.phoenix.query.QueryServices;
 public class ConnectionQueryServicesTestImpl extends 
ConnectionQueryServicesImpl {
     protected int NUM_SLAVES_BASE = 1; // number of slaves for the cluster
     
-    public ConnectionQueryServicesTestImpl(QueryServices services, 
ConnectionInfo info) throws SQLException {
+    public ConnectionQueryServicesTestImpl(QueryServices services, 
ConnectionInfo info, Properties info2) throws SQLException {
         super(services, info, null);
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 45b9dd6..c452991 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -27,10 +27,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 
-import co.cask.tephra.TransactionAware;
 import co.cask.tephra.hbase98.TransactionAwareHTable;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -89,8 +86,6 @@ public class MutationState implements SQLCloseable {
     //   rows - map from rowkey to columns
     //      columns - map from column to value
     private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
-    // map from table ref to htable (possibly a TransactionAwareHTable)
-    private Map<TableRef, HTableInterface> tableRefToHTableMap;
     private long sizeOffset;
     private int numRows = 0;
     
@@ -360,29 +355,6 @@ public class MutationState implements SQLCloseable {
         }
     }
     
-    /**
-     * Creates a map from table ref to htable which is used by {@link 
PhoenixConnection} 
-     * for transactions
-     * @return list of transaction aware htables
-     */
-    public List<TransactionAware> preSend() throws SQLException {
-       List<TransactionAware> txAwareHTables = 
Lists.newArrayListWithExpectedSize(mutations.size());
-       tableRefToHTableMap = Maps.newHashMapWithExpectedSize(mutations.size());
-       for ( TableRef tableRef : this.mutations.keySet()) {
-               PTable table = tableRef.getTable();
-               byte[] hTableName = table.getPhysicalName().getBytes();
-               HTableInterface hTable = 
connection.getQueryServices().getTable(hTableName);
-               if (table.isTransactional()) {
-                       TransactionAwareHTable transactionAwareHTable = new 
TransactionAwareHTable(hTable);
-               txAwareHTables.add(transactionAwareHTable);
-               hTable = transactionAwareHTable;
-               }
-               tableRefToHTableMap.put(tableRef, hTable);
-       }
-       return txAwareHTables;
-    }
-    
-    
     @SuppressWarnings("deprecation")
     public void send() throws SQLException {
         int i = 0;
@@ -447,8 +419,15 @@ public class MutationState implements SQLCloseable {
                     }
                     
                     SQLException sqlE = null;
-                    HTableInterface hTable = tableRefToHTableMap.get(tableRef);
+                    HTableInterface hTable = 
connection.getQueryServices().getTable(table.getPhysicalName().getBytes());
                     try {
+                        // Don't add immutable indexes (those are the only 
ones that would participate
+                        // during a commit), as we don't need conflict 
detection for these.
+                        if (table.isTransactional() && table.getType() != 
PTableType.INDEX) {
+                            TransactionAwareHTable txnAware = new 
TransactionAwareHTable(hTable);
+                            connection.addTxParticipant(txnAware);
+                            hTable = txnAware;
+                        }
                         logMutationSize(hTable, mutations, connection);
                         MUTATION_BATCH_SIZE.update(mutations.size());
                         long startTime = System.currentTimeMillis();
@@ -489,6 +468,16 @@ public class MutationState implements SQLCloseable {
                                 cache.close();
                             }
                         } finally {
+                            try {
+                                hTable.close();
+                            } 
+                            catch (IOException e) {
+                                if (sqlE != null) {
+                                    
sqlE.setNextException(ServerUtil.parseServerException(e));
+                                } else {
+                                    sqlE = ServerUtil.parseServerException(e);
+                                }
+                            } 
                             if (sqlE != null) {
                                 throw sqlE;
                             }
@@ -507,29 +496,7 @@ public class MutationState implements SQLCloseable {
         assert(this.mutations.isEmpty());
     }
     
-    public void postSend() throws SQLException {
-       SQLException sqlE = null;
-               for (Entry<TableRef, HTableInterface> entry : 
tableRefToHTableMap.entrySet()) {
-                       HTableInterface hTable = entry.getValue();
-                       try {
-                               hTable.close();
-                       } 
-                       catch (IOException e) {
-                               if (sqlE != null) {
-                                       
sqlE.setNextException(ServerUtil.parseServerException(e));
-                               } else {
-                                       sqlE = 
ServerUtil.parseServerException(e);
-                               }
-                       } 
-                       finally {
-                               if (sqlE != null) {
-                    throw sqlE;
-                }
-                       }
-               }
-    }
-    
-    public void clear(PhoenixConnection connection) throws SQLException {
+    public void clear() throws SQLException {
         this.mutations.clear();
         numRows = 0;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 91aa573..5beb2e1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -86,7 +86,9 @@ public class TableResultIterator extends ExplainTable 
implements ResultIterator
         PTable table = tableRef.getTable();
         HTableInterface htable = 
context.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
         if (table.isTransactional()) {
-            htable = new TransactionAwareHTable(htable);
+            TransactionAwareHTable txnAware = new 
TransactionAwareHTable(htable);
+            context.getConnection().addTxParticipant(txnAware);
+            htable = txnAware;
         }
         this.htable = htable;
         if (creationMode == ScannerCreation.IMMEDIATE) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index a24f76b..613d3e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -57,7 +57,7 @@ import javax.annotation.Nullable;
 import co.cask.tephra.TransactionAware;
 import co.cask.tephra.TransactionContext;
 import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.distributed.TransactionServiceClient;
+import co.cask.tephra.TransactionSystemClient;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.call.CallRunner;
@@ -128,7 +128,6 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
     private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
     private final Map<PDataType<?>, Format> formatters = new HashMap<>();
     private final MutationState mutationState;
-    private final TransactionServiceClient transactionServiceClient;
     private final int mutateBatchSize;
     private final Long scn;
     private boolean isAutoCommit = false;
@@ -139,6 +138,7 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
     private final String timestampPattern;
     private TraceScope traceScope = null;
     
+    private TransactionContext txContext;
     private boolean isClosed = false;
     private Sampler<?> sampler;
     private boolean readOnly = false;
@@ -247,33 +247,10 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
         // setup tracing, if its enabled
         this.sampler = Tracing.getConfiguredSampler(this);
         this.customTracingAnnotations = getImmutableCustomTracingAnnotations();
-        
-        //create a transaction service client
-        /* commenting out for now as this breaks many unit tests
-         * TODO: Move to ConnectionQueryServicesImpl 
-        Configuration config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
-        String zkQuorumServersString = 
ConnectionInfo.getZookeeperConnectionString(url);
-        ZKClientService zkClientService = ZKClientServices.delegate(
-                     ZKClients.reWatchOnExpire(
-                       ZKClients.retryOnFailure(
-                         ZKClientService.Builder.of(zkQuorumServersString)
-                           
.setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
-                           .build(),
-                         RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS)
-                       )
-                     )
-                   );
-        zkClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new 
ZKDiscoveryService(zkClientService);
-        PooledClientProvider pooledClientProvider = new PooledClientProvider(
-                       config, zkDiscoveryService);
-        this.transactionServiceClient = new 
TransactionServiceClient(config,pooledClientProvider);
-        */
-        this.transactionServiceClient = null;
     }
     
     public TransactionContext getTransactionContext() {
-        return null; // TODO
+        return txContext;
     }
     
     private ImmutableMap<String, String> 
getImmutableCustomTracingAnnotations() {
@@ -430,7 +407,7 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
         // from modifying this list.
         this.statements = Lists.newArrayList();
         try {
-            mutationState.clear(this);
+            mutationState.clear();
         } finally {
             try {
                 SQLCloseables.closeAll(statements);
@@ -459,24 +436,42 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
         }
     }
 
+    public void startTransaction() throws SQLException {
+        if (txContext == null) {
+            try {
+                TransactionSystemClient txServiceClient = 
this.getQueryServices().getTransactionSystemClient();
+                this.txContext = new TransactionContext(txServiceClient);
+                txContext.start();
+            } catch (TransactionFailureException e) {
+                throw new SQLException(e); // TODO: error code
+            }
+        }
+    }
+    
+    public void addTxParticipant(TransactionAware txnAware) {
+        txContext.addTransactionAware(txnAware);
+    }
+    
+    private boolean isTransactionStarted() {
+        return txContext != null;
+    }
+    
+    private void endTransaction() {
+        txContext = null;
+    }
+    
     @Override
     public void commit() throws SQLException {
         CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() {
             @Override
             public Void call() throws SQLException {
-               List<TransactionAware> txAwareHTables = mutationState.preSend();
-                               if (txAwareHTables.isEmpty()) {
-                                       mutationState.send();
-                               } 
-                               else {
-                                       TransactionContext transactionContext = 
new TransactionContext(transactionServiceClient, txAwareHTables);
+                mutationState.send();
+                if (isTransactionStarted()) {
                                        try {
-                                               transactionContext.start();
-                               mutationState.send();
-                                               transactionContext.finish();
+                                               txContext.finish();
                                        } catch (TransactionFailureException e) 
{
                                                try {
-                                                       
transactionContext.abort();
+                                                   txContext.abort();
                                                        throw new 
SQLExceptionInfo.Builder(
                                                                        
SQLExceptionCode.TRANSACTION_FINISH_EXCEPTION)
                                                                        
.setRootCause(e).build().buildException();
@@ -485,12 +480,13 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
                                                                        
SQLExceptionCode.TRANSACTION_ABORT_EXCEPTION)
                                                                        
.setRootCause(e1).build().buildException();
                                                }
+                                       } finally {
+                                           endTransaction();
                                        }
-                               }
-                               mutationState.postSend();
+                }
                 return null;
             }
-        }, Tracing.withTracing(this, "committing mutations"));
+        }, Tracing.withTracing(this, "sending mutations"));
     }
 
     @Override
@@ -690,7 +686,16 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
 
     @Override
     public void rollback() throws SQLException {
-        mutationState.clear(this);
+        mutationState.clear();
+        if (isTransactionStarted()) {
+            try {
+                txContext.abort();
+            } catch (TransactionFailureException e) {
+                throw new SQLException(e); // TODO: error code
+            } finally {
+                endTransaction();
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index 6360d06..c45a35d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -147,7 +147,7 @@ public final class PhoenixDriver extends 
PhoenixEmbeddedDriver {
             ConnectionQueryServices connectionQueryServices = 
connectionQueryServicesMap.get(normalizedConnInfo);
             if (connectionQueryServices == null) {
                 if (normalizedConnInfo.isConnectionless()) {
-                    connectionQueryServices = new 
ConnectionlessQueryServicesImpl(services, normalizedConnInfo);
+                    connectionQueryServices = new 
ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info);
                 } else {
                     connectionQueryServices = new 
ConnectionQueryServicesImpl(services, normalizedConnInfo, info);
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index 4a65b76..ff25fae 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -251,11 +251,6 @@ public abstract class PhoenixEmbeddedDriver implements 
Driver, org.apache.phoeni
             return new ConnectionInfo(quorum,port,rootNode, principal, 
keytabFile);
         }
         
-        public static String getZookeeperConnectionString(String url) throws 
SQLException {
-               ConnectionInfo connInfo = ConnectionInfo.create(url);
-               return connInfo.getZookeeperQuorum()+":"+connInfo.getPort();
-        }
-        
         public ConnectionInfo normalize(ReadOnlyProps props) throws 
SQLException {
             String zookeeperQuorum = this.getZookeeperQuorum();
             Integer port = this.getPort();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 746c0b7..2c0021f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -236,9 +236,13 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
                         QueryPlan plan = 
stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
                         plan = 
connection.getQueryServices().getOptimizer().optimize(
                                 PhoenixStatement.this, plan);
+                        startTransaction(plan);
                          // this will create its own trace internally, so we 
don't wrap this
                          // whole thing in tracing
                         ResultIterator resultIterator = plan.iterator();
+                        if (connection.getAutoCommit()) {
+                            connection.commit(); // Forces new read point for 
next statement
+                        }
                         if (logger.isDebugEnabled()) {
                             String explainPlan = 
QueryUtil.getExplainPlan(resultIterator);
                             logger.debug(LogUtil.addCustomAnnotations("Explain 
plan: " + explainPlan, connection));
@@ -271,6 +275,15 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
         }
     }
     
+    private void startTransaction(StatementPlan plan) throws SQLException {
+        for (TableRef ref : plan.getContext().getResolver().getTables()) {
+            if (ref.getTable().isTransactional()) {
+                connection.startTransaction();
+                break;
+            }
+        }
+    }
+    
     protected int executeMutation(final CompilableStatement stmt) throws 
SQLException {
         if (connection.isReadOnly()) {
             throw new SQLExceptionInfo.Builder(
@@ -289,6 +302,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
                             // the latest state
                             try {
                                 MutationPlan plan = 
stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
+                                startTransaction(plan);
                                 MutationState state = plan.execute();
                                 connection.getMutationState().join(state);
                                 if (connection.getAutoCommit()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 09705c6..57b5ac6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import co.cask.tephra.TransactionSystemClient;
+
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -111,4 +113,6 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
     
     public void clearCache() throws SQLException;
     public int getSequenceSaltBuckets();
+
+    TransactionSystemClient getTransactionSystemClient();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 8a1671b..4b23d10 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -42,6 +42,9 @@ import java.util.concurrent.TimeoutException;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.distributed.PooledClientProvider;
+import co.cask.tephra.distributed.TransactionServiceClient;
 import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
 
 import org.apache.hadoop.conf.Configuration;
@@ -151,6 +154,11 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.UpgradeUtil;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,6 +181,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     // Max number of cached table stats for view or shared index physical 
tables
     private static final int MAX_TABLE_STATS_CACHE_ENTRIES = 512;
     protected final Configuration config;
+    private final ConnectionInfo connectionInfo;
     // Copy of config.getProps(), but read-only to prevent synchronization 
that we
     // don't need.
     private final ReadOnlyProps props;
@@ -194,6 +203,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private final Object connectionCountLock = new Object();
 
     private HConnection connection;
+    private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
 
@@ -245,6 +255,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         for (Entry<String,String> entry : connectionInfo.asProps()) {
             config.set(entry.getKey(), entry.getValue());
         }
+        this.connectionInfo = connectionInfo;
 
         // Without making a copy of the configuration we cons up, we lose some 
of our properties
         // on the server side during testing.
@@ -269,6 +280,30 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 .build();
     }
 
+    @Override
+    public TransactionSystemClient getTransactionSystemClient() {
+        return txServiceClient;
+    }
+    
+    private void initTxServiceClient() {
+        String zkQuorumServersString = connectionInfo.getZookeeperQuorum();
+        ZKClientService zkClientService = ZKClientServices.delegate(
+                  ZKClients.reWatchOnExpire(
+                    ZKClients.retryOnFailure(
+                      ZKClientService.Builder.of(zkQuorumServersString)
+                        
.setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+                        .build(),
+                      RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS)
+                    )
+                  )
+                );
+        zkClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new 
ZKDiscoveryService(zkClientService);
+        PooledClientProvider pooledClientProvider = new PooledClientProvider(
+                config, zkDiscoveryService);
+        this.txServiceClient = new 
TransactionServiceClient(config,pooledClientProvider);
+    }
+    
     private void openConnection() throws SQLException {
         try {
             // check if we need to authenticate with kerberos
@@ -280,6 +315,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 User.login(config, HBASE_CLIENT_KEYTAB, 
HBASE_CLIENT_PRINCIPAL, null);
                 logger.info("Successfull login to secure cluster!!");
             }
+            initTxServiceClient();
             this.connection = 
HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
         } catch (IOException e) {
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 742c38e..82b0e1a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -23,9 +23,15 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
+import co.cask.tephra.TransactionManager;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.inmemory.InMemoryTxSystemClient;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -95,17 +101,37 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     private PMetaData metaData;
     private final Map<SequenceKey, SequenceInfo> sequenceMap = 
Maps.newHashMap();
     private final String userName;
+    private final TransactionSystemClient txSystemClient;
     private KeyValueBuilder kvBuilder;
     private volatile boolean initialized;
     private volatile SQLException initializationException;
     private final Map<String, List<HRegionLocation>> tableSplits = 
Maps.newHashMap();
     
-    public ConnectionlessQueryServicesImpl(QueryServices queryServices, 
ConnectionInfo connInfo) {
-        super(queryServices);
+    public ConnectionlessQueryServicesImpl(QueryServices services, 
ConnectionInfo connInfo, Properties info) {
+        super(services);
         userName = connInfo.getPrincipal();
         metaData = newEmptyMetaData();
         // Use KeyValueBuilder that builds real KeyValues, as our test utils 
require this
         this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
+        // TOOD: copy/paste from ConnectionQueryServicesImpl
+        Configuration config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        for (Entry<String,String> entry : services.getProps()) {
+            config.set(entry.getKey(), entry.getValue());
+        }
+        if (info != null) {
+            for (Object key : info.keySet()) {
+                config.set((String) key, info.getProperty((String) key));
+            }
+        }
+        for (Entry<String,String> entry : connInfo.asProps()) {
+            config.set(entry.getKey(), entry.getValue());
+        }
+
+        // Without making a copy of the configuration we cons up, we lose some 
of our properties
+        // on the server side during testing.
+        config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
+        TransactionManager txnManager = new TransactionManager(config);
+        this.txSystemClient = new InMemoryTxSystemClient(txnManager);
     }
 
     private PMetaData newEmptyMetaData() {
@@ -479,5 +505,10 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
         return getProps().getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
                 QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
     }
+
+    @Override
+    public TransactionSystemClient getTransactionSystemClient() {
+        return txSystemClient;
+    }
  
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index e2c9544..cf593e2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import co.cask.tephra.TransactionSystemClient;
+
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -250,4 +252,9 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
     public int getSequenceSaltBuckets() {
         return getDelegate().getSequenceSaltBuckets();
     }
+
+    @Override
+    public TransactionSystemClient getTransactionSystemClient() {
+        return getDelegate().getTransactionSystemClient();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java 
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index 0d3c461..0b6573c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -89,9 +89,9 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
         if (connectionQueryServices != null) { return connectionQueryServices; 
}
         ConnectionInfo connInfo = ConnectionInfo.create(url);
         if (connInfo.isConnectionless()) {
-            connectionQueryServices = new 
ConnectionlessQueryServicesImpl(queryServices, connInfo);
+            connectionQueryServices = new 
ConnectionlessQueryServicesImpl(queryServices, connInfo, info);
         } else {
-            connectionQueryServices = new 
ConnectionQueryServicesTestImpl(queryServices, connInfo);
+            connectionQueryServices = new 
ConnectionQueryServicesTestImpl(queryServices, connInfo, info);
         }
         connectionQueryServices.init(url, info);
         return connectionQueryServices;

Reply via email to