remove additional tephra stuff

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f090dd24
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f090dd24
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f090dd24

Branch: refs/heads/omid
Commit: f090dd24e403ae8656f5041cd86cb09eb7ad68ff
Parents: f5b19f1
Author: Ohad Shacham <[email protected]>
Authored: Sun May 7 15:48:07 2017 +0300
Committer: Ohad Shacham <[email protected]>
Committed: Sun May 7 15:48:07 2017 +0300

----------------------------------------------------------------------
 .../phoenix/tx/FlappingTransactionIT.java       |  8 +--
 .../phoenix/tx/ParameterizedTransactionIT.java  |  4 +-
 .../org/apache/phoenix/tx/TransactionIT.java    |  1 -
 .../apache/phoenix/index/IndexMaintainer.java   |  4 +-
 .../phoenix/query/ConnectionQueryServices.java  |  2 -
 .../query/ConnectionQueryServicesImpl.java      | 62 ++++++-------------
 .../query/ConnectionlessQueryServicesImpl.java  | 15 +----
 .../query/DelegateConnectionQueryServices.java  |  6 --
 .../org/apache/phoenix/schema/PTableImpl.java   |  6 +-
 .../transaction/OmidTransactionContext.java     | 25 ++++++++
 .../transaction/PhoenixTransactionContext.java  | 26 ++++++++
 .../transaction/TephraTransactionContext.java   | 64 +++++++++++++++++---
 12 files changed, 136 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
index 0bc7c24..97b5e71 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
@@ -47,7 +47,6 @@ import 
org.apache.phoenix.transaction.PhoenixTransactionalTable;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.apache.tephra.TransactionSystemClient;
 import org.junit.Test;
 
 /**
@@ -213,8 +212,6 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         String fullTableName = generateUniqueName();
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         
-        TransactionSystemClient txServiceClient = 
pconn.getQueryServices().getTransactionSystemClient();
-
         Statement stmt = conn.createStatement();
         stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY 
KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true");
         HTableInterface htable = 
pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName));
@@ -227,7 +224,6 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
             assertEquals(1,rs.getInt(1));
         }
 
-        // Use HBase level Tephra APIs to start a new transaction
         //TransactionAwareHTable txAware = new TransactionAwareHTable(htable, 
TxConstants.ConflictDetection.ROW);
         PhoenixTransactionContext txContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(pconn);
         PhoenixTransactionalTable txTable = 
TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, 
htable);
@@ -260,7 +256,7 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         assertTrue(rs.next());
         assertEquals(3,rs.getInt(1));
         
-        // Use Tephra APIs directly to finish (i.e. commit) the transaction
+        // Use TM APIs directly to finish (i.e. commit) the transaction
         txContext.commit();
         
         // Confirm that attempt to commit row with conflict fails
@@ -306,7 +302,7 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         assertTrue(rs.next());
         assertEquals(4,rs.getInt(1));
 
-        // Use Tephra APIs directly to abort (i.e. rollback) the transaction
+        // Use TM APIs directly to abort (i.e. rollback) the transaction
         txContext.abort();
         
         rs = conn.createStatement().executeQuery("select count(*) from " + 
fullTableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
index badf39b..fcb463c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
@@ -53,10 +53,10 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.apache.tephra.TxConstants;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -391,7 +391,7 @@ public class ParameterizedTransactionIT extends 
ParallelStatsDisabledIT {
         admin.createTable(desc);
         ddl = "CREATE TABLE " + t2 + " (k varchar primary key) 
transactional=true";
         conn.createStatement().execute(ddl);
-        assertEquals(Boolean.TRUE.toString(), 
admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA));
+        assertEquals(Boolean.TRUE.toString(), 
admin.getTableDescriptor(TableName.valueOf(t2)).getValue(PhoenixTransactionContext.READ_NON_TX_DATA));
         
         // Should be ok, as HBase metadata should match existing metadata.
         ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 78c510b..46cc953 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -47,7 +47,6 @@ import 
org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.apache.tephra.TxConstants;
 import org.junit.Test;
 
 public class TransactionIT  extends ParallelStatsDisabledIT {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 2224e38..19ba609 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -100,6 +100,7 @@ import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.tuple.BaseTuple;
 import org.apache.phoenix.schema.tuple.ValueGetterTuple;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.BitSet;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -108,7 +109,6 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
-import org.apache.tephra.TxConstants;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -1064,7 +1064,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             }
                else if (kv.getTypeByte() == 
KeyValue.Type.DeleteFamily.getCode()
                                // Since we don't include the index rows in the 
change set for txn tables, we need to detect row deletes that have transformed 
by TransactionProcessor
-                               || (CellUtil.matchingQualifier(kv, 
TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, 
HConstants.EMPTY_BYTE_ARRAY))) {
+                               || (CellUtil.matchingQualifier(kv, 
TransactionFactory.getTransactionFactory().getTransactionContext().get_famility_delete_marker())
 && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) {
                    nDeleteCF++;
                }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 38580e4..45ab5fa 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -46,7 +46,6 @@ import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
-import org.apache.tephra.TransactionSystemClient;
 
 
 public interface ConnectionQueryServices extends QueryServices, 
MetaDataMutated {
@@ -132,7 +131,6 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
     public long clearCache() throws SQLException;
     public int getSequenceSaltBuckets();
 
-    TransactionSystemClient getTransactionSystemClient();
     public long getRenewLeaseThresholdMilliSeconds();
     public boolean isRenewingLeasesEnabled();
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 59252ad..815f669 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -219,6 +219,8 @@ import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.ConfigUtil;
@@ -232,11 +234,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.UpgradeUtil;
-import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.distributed.PooledClientProvider;
-import org.apache.tephra.distributed.TransactionServiceClient;
-import org.apache.tephra.zookeeper.TephraZKClientService;
 import org.apache.twill.discovery.ZKDiscoveryService;
 import org.apache.twill.zookeeper.RetryStrategies;
 import org.apache.twill.zookeeper.ZKClientService;
@@ -287,7 +284,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
 
     private HConnection connection;
     private ZKClientService txZKClientService;
-    private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
 
@@ -396,32 +392,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
 
     }
 
-    @Override
-    public TransactionSystemClient getTransactionSystemClient() {
-        return txServiceClient;
-    }
-
     private void initTxServiceClient() {
-        String zkQuorumServersString = 
this.getProps().get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
-        if (zkQuorumServersString==null) {
-            zkQuorumServersString = 
connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort();
-        }
-
-        int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
-        // Create instance of the tephra zookeeper client
-        txZKClientService = ZKClientServices.delegate(
-            ZKClients.reWatchOnExpire(
-                ZKClients.retryOnFailure(
-                     new TephraZKClientService(zkQuorumServersString, timeOut, 
null,
-                             ArrayListMultimap.<String, byte[]>create()), 
-                         RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS))
-                     )
-                );
-        txZKClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new 
ZKDiscoveryService(txZKClientService);
-        PooledClientProvider pooledClientProvider = new PooledClientProvider(
-                config, zkDiscoveryService);
-        this.txServiceClient = new 
TransactionServiceClient(config,pooledClientProvider);
+        txZKClientService = 
TransactionFactory.getTransactionFactory().getTransactionContext().setTransactionClient(config,
 props, connectionInfo);
     }
 
     private void openConnection() throws SQLException {
@@ -862,7 +834,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
             boolean isTransactional =
                     
Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) ||
-                    
Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER 
TABLE
+                    
Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA));
 // For ALTER TABLE
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing 
coprocessor for indexes.
             // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table 
because we use
@@ -1125,7 +1097,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 // If mapping an existing table as transactional, set property 
so that existing
                 // data is correctly read.
                 if (willBeTx) {
-                    newDesc.setValue(TxConstants.READ_NON_TX_DATA, 
Boolean.TRUE.toString());
+                    
newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, 
Boolean.TRUE.toString());
                 } else {
                     // If we think we're creating a non transactional table 
when it's already
                     // transactional, don't allow.
@@ -1134,7 +1106,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                         
.setSchemaName(SchemaUtil.getSchemaNameFromFullName(tableName))
                         
.setTableName(SchemaUtil.getTableNameFromFullName(tableName)).build().buildException();
                     }
-                    newDesc.remove(TxConstants.READ_NON_TX_DATA);
+                    newDesc.remove(PhoenixTransactionContext.READ_NON_TX_DATA);
                 }
                 if (existingDesc.equals(newDesc)) {
                     return null; // Indicate that no metadata was changed
@@ -1754,7 +1726,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + 
table.getIndexes().size());
             tableDescriptors.add(tableDescriptor);
             origTableDescriptors.add(origTableDescriptor);
-            nonTxToTx = 
Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA));
+            nonTxToTx = 
Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA));
             /*
              * If the table was transitioned from non transactional to 
transactional, we need
              * to also transition the index tables.
@@ -1864,7 +1836,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 indexTableProps = Collections.<String,Object>emptyMap();
             } else {
                 indexTableProps = Maps.newHashMapWithExpectedSize(1);
-                indexTableProps.put(TxConstants.READ_NON_TX_DATA, 
Boolean.valueOf(txValue));
+                
indexTableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, 
Boolean.valueOf(txValue));
             }
             for (PTable index : table.getIndexes()) {
                 HTableDescriptor indexDescriptor = 
admin.getTableDescriptor(index.getPhysicalName().getBytes());
@@ -1877,7 +1849,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     HColumnDescriptor indexColDescriptor = 
indexDescriptor.getFamily(indexFamilyName);
                     HColumnDescriptor tableColDescriptor = 
tableDescriptor.getFamily(dataFamilyName);
                     
indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
-                    indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, 
tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+                    
indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, 
tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL));
                 } else {
                     for (PColumnFamily family : index.getColumnFamilies()) {
                         byte[] familyName = family.getName().getBytes();
@@ -1885,7 +1857,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                         HColumnDescriptor indexColDescriptor = 
indexDescriptor.getFamily(familyName);
                         HColumnDescriptor tableColDescriptor = 
tableDescriptor.getFamily(familyName);
                         
indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
-                        indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, 
tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+                        
indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, 
tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL));
                     }
                 }
                 setTransactional(indexDescriptor, index.getType(), txValue, 
indexTableProps);
@@ -1921,7 +1893,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             HColumnDescriptor indexColDescriptor = 
indexDescriptor.getFamily(familyName);
             HColumnDescriptor tableColDescriptor = 
tableDescriptor.getFamily(familyName);
             
indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
-            indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, 
tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+            
indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, 
tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL));
         } else {
             for (PColumnFamily family : table.getColumnFamilies()) {
                 byte[] familyName = family.getName().getBytes();
@@ -1929,7 +1901,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 if (indexColDescriptor != null) {
                     HColumnDescriptor tableColDescriptor = 
tableDescriptor.getFamily(familyName);
                     
indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
-                    indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, 
tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+                    
indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, 
tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL));
                 }
             }
         }
@@ -1957,9 +1929,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     }
     private void setTransactional(HTableDescriptor tableDescriptor, PTableType 
tableType, String txValue, Map<String, Object> tableProps) throws SQLException {
         if (txValue == null) {
-            tableDescriptor.remove(TxConstants.READ_NON_TX_DATA);
+            tableDescriptor.remove(PhoenixTransactionContext.READ_NON_TX_DATA);
         } else {
-            tableDescriptor.setValue(TxConstants.READ_NON_TX_DATA, txValue);
+            
tableDescriptor.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, txValue);
         }
         this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, 
tableType, tableProps);
     }
@@ -2005,7 +1977,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                 commonFamilyProps.put(propName, 
prop.getSecond());
                             } else if 
(propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && 
Boolean.TRUE.equals(propValue)) {
                                 willBeTransactional = isOrWillBeTransactional 
= true;
-                                tableProps.put(TxConstants.READ_NON_TX_DATA, 
propValue);
+                                
tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, propValue);
                             }
                         } else {
                             if (MetaDataUtil.isHColumnProperty(propName)) {
@@ -2172,10 +2144,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                         if (props == null) {
                             props = new HashMap<String, Object>();
                         }
-                        props.put(TxConstants.PROPERTY_TTL, ttl);
+                        props.put(PhoenixTransactionContext.PROPERTY_TTL, ttl);
                         // Remove HBase TTL if we're not transitioning an 
existing table to become transactional
                         // or if the existing transactional table wasn't 
originally non transactional.
-                        if (!willBeTransactional && 
!Boolean.valueOf(newTableDescriptor.getValue(TxConstants.READ_NON_TX_DATA))) {
+                        if (!willBeTransactional && 
!Boolean.valueOf(newTableDescriptor.getValue(PhoenixTransactionContext.READ_NON_TX_DATA)))
 {
                             props.remove(TTL);
                         }
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 47ef954..8e72e74 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -80,15 +80,13 @@ import 
org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.JDBCUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SequenceUtil;
-import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -107,7 +105,6 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     private PMetaData metaData;
     private final Map<SequenceKey, SequenceInfo> sequenceMap = 
Maps.newHashMap();
     private final String userName;
-    private final TransactionSystemClient txSystemClient;
     private KeyValueBuilder kvBuilder;
     private volatile boolean initialized;
     private volatile SQLException initializationException;
@@ -119,7 +116,7 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
         super(services);
         userName = connInfo.getPrincipal();
         metaData = newEmptyMetaData();
-        
+
         // Use KeyValueBuilder that builds real KeyValues, as our test utils 
require this
         this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
         Configuration config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
@@ -138,8 +135,7 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
         // Without making a copy of the configuration we cons up, we lose some 
of our properties
         // on the server side during testing.
         this.config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
-        TransactionManager txnManager = new TransactionManager(config);
-        this.txSystemClient = new InMemoryTxSystemClient(txnManager);
+        
TransactionFactory.getTransactionFactory().getTransactionContext().setInMemoryTransactionClient(config);
         this.guidePostsCache = new GuidePostsCache(this, config);
     }
 
@@ -531,11 +527,6 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
                 QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
     }
 
-    @Override
-    public TransactionSystemClient getTransactionSystemClient() {
-        return txSystemClient;
-    }
- 
     public MetaDataMutationResult createFunction(List<Mutation> functionData, 
PFunction function, boolean temporary)
             throws SQLException {
         return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0l, 
null);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 7f7c027..6c464eb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -47,7 +47,6 @@ import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
-import org.apache.tephra.TransactionSystemClient;
 
 
 public class DelegateConnectionQueryServices extends DelegateQueryServices 
implements ConnectionQueryServices {
@@ -257,11 +256,6 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
     }
 
     @Override
-    public TransactionSystemClient getTransactionSystemClient() {
-        return getDelegate().getTransactionSystemClient();
-    }
-
-    @Override
     public MetaDataMutationResult createFunction(List<Mutation> functionData, 
PFunction function, boolean temporary)
             throws SQLException {
         return getDelegate().createFunction(functionData, function, temporary);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index d91ebcb..ab8fe5c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -69,13 +69,13 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
-import org.apache.tephra.TxConstants;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -1022,11 +1022,11 @@ public class PTableImpl implements PTable {
             if (PTableImpl.this.isTransactional()) {
                 Put put = new Put(key);
                 if (families.isEmpty()) {
-                    put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), 
TxConstants.FAMILY_DELETE_QUALIFIER, ts,
+                    put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), 
TransactionFactory.getTransactionFactory().getTransactionContext().get_famility_delete_marker(),
 ts,
                             HConstants.EMPTY_BYTE_ARRAY);
                 } else {
                     for (PColumnFamily colFamily : families) {
-                        put.add(colFamily.getName().getBytes(), 
TxConstants.FAMILY_DELETE_QUALIFIER, ts,
+                        put.add(colFamily.getName().getBytes(), 
TransactionFactory.getTransactionFactory().getTransactionContext().get_famility_delete_marker(),
 ts,
                                 HConstants.EMPTY_BYTE_ARRAY);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index d122d0c..cec07d3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -3,8 +3,12 @@ package org.apache.phoenix.transaction;
 import java.sql.SQLException;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.twill.zookeeper.ZKClientService;
 import org.slf4j.Logger;
 
 public class OmidTransactionContext implements PhoenixTransactionContext {
@@ -116,4 +120,25 @@ public class OmidTransactionContext implements 
PhoenixTransactionContext {
         // TODO Auto-generated method stub
         return null;
     }
+
+    @Override
+    public void setInMemoryTransactionClient(Configuration config) {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public ZKClientService setTransactionClient(Configuration config, 
ReadOnlyProps props,
+            ConnectionInfo connectionInfo) {
+        // TODO Auto-generated method stub
+        
+        return null;
+        
+    }
+
+    @Override
+    public byte[] get_famility_delete_marker() {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index 0854f4e..36f7804 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -1,7 +1,11 @@
 package org.apache.phoenix.transaction;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.twill.zookeeper.ZKClientService;
 import org.slf4j.Logger;
 
 import java.sql.SQLException;
@@ -27,6 +31,22 @@ public interface PhoenixTransactionContext {
     public static final String READ_NON_TX_DATA = "data.tx.read.pre.existing";
 
     /**
+     * Set the in memory client connection to the transaction manager (for 
testing purpose)
+     *
+     * @param config
+     */
+    public void setInMemoryTransactionClient(Configuration config);
+
+    /**
+     * Set the client connection to the transaction manager
+     *
+     * @param config
+     * @param props
+     * @param connectionInfo
+     */
+    public ZKClientService setTransactionClient(Configuration config, 
ReadOnlyProps props, ConnectionInfo connectionInfo);
+
+    /**
      * Starts a transaction
      *
      * @throws SQLException
@@ -138,4 +158,10 @@ public interface PhoenixTransactionContext {
      * @return the coprocessor
      */
     public BaseRegionObserver getCoProcessor();
+
+    /**
+     * 
+     * @return the family delete marker
+     */
+    public byte[] get_famility_delete_marker(); 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index a5e6e64..0334826 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -10,12 +10,13 @@ import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.schema.PTable;
-import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionAware;
 import org.apache.tephra.TransactionCodec;
@@ -26,12 +27,21 @@ import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.Transaction.VisibilityLevel;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.PooledClientProvider;
+import org.apache.tephra.distributed.TransactionServiceClient;
 import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.util.TxUtils;
 import org.apache.tephra.visibility.FenceWait;
 import org.apache.tephra.visibility.VisibilityFence;
-
+import org.apache.tephra.zookeeper.TephraZKClientService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 
 import org.slf4j.Logger;
@@ -40,6 +50,8 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
 
     private static final TransactionCodec CODEC = new TransactionCodec();
 
+    private static TransactionSystemClient txClient = null;
+
     private final List<TransactionAware> txAwares;
     private final TransactionContext txContext;
     private Transaction tx;
@@ -59,17 +71,16 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
     }
 
     public TephraTransactionContext(PhoenixConnection connection) {
-        this.txServiceClient = connection.getQueryServices()
-                .getTransactionSystemClient();
+        assert(txClient != null);
+        this.txServiceClient = txClient;  
         this.txAwares = Collections.emptyList();
         this.txContext = new TransactionContext(txServiceClient);
     }
 
     public TephraTransactionContext(PhoenixTransactionContext ctx,
             PhoenixConnection connection, boolean subTask) {
-        this.txServiceClient = connection.getQueryServices()
-                .getTransactionSystemClient();
-
+        assert(txClient != null);
+        this.txServiceClient = txClient;  
         assert (ctx instanceof TephraTransactionContext);
         TephraTransactionContext tephraTransactionContext = 
(TephraTransactionContext) ctx;
 
@@ -86,6 +97,38 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
     }
 
     @Override
+    public void setInMemoryTransactionClient(Configuration config) {
+        TransactionManager txnManager = new TransactionManager(config);
+        txClient = this.txServiceClient = new 
InMemoryTxSystemClient(txnManager);
+    }
+
+    @Override
+    public ZKClientService setTransactionClient(Configuration config, 
ReadOnlyProps props, ConnectionInfo connectionInfo) {
+        String zkQuorumServersString = 
props.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
+        if (zkQuorumServersString==null) {
+            zkQuorumServersString = 
connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort();
+        }
+
+        int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
+        // Create instance of the tephra zookeeper client
+        ZKClientService txZKClientService  = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, 
null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS))
+                     )
+                );
+        txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new 
ZKDiscoveryService(txZKClientService);
+        PooledClientProvider pooledClientProvider = new PooledClientProvider(
+                config, zkDiscoveryService);
+        txClient = this.txServiceClient = new 
TransactionServiceClient(config,pooledClientProvider);
+        
+        return txZKClientService;
+    }
+
+    @Override
     public void begin() throws SQLException {
         if (txContext == null) {
             throw new SQLExceptionInfo.Builder(
@@ -362,6 +405,11 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
         return new TransactionProcessor();
     }
 
+    @Override
+    public byte[] get_famility_delete_marker() { 
+        return TxConstants.FAMILY_DELETE_QUALIFIER;
+    }
+
     /**
      * TephraTransactionContext specific functions
      */

Reply via email to