remove additional tephra stuff
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f090dd24 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f090dd24 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f090dd24 Branch: refs/heads/omid Commit: f090dd24e403ae8656f5041cd86cb09eb7ad68ff Parents: f5b19f1 Author: Ohad Shacham <[email protected]> Authored: Sun May 7 15:48:07 2017 +0300 Committer: Ohad Shacham <[email protected]> Committed: Sun May 7 15:48:07 2017 +0300 ---------------------------------------------------------------------- .../phoenix/tx/FlappingTransactionIT.java | 8 +-- .../phoenix/tx/ParameterizedTransactionIT.java | 4 +- .../org/apache/phoenix/tx/TransactionIT.java | 1 - .../apache/phoenix/index/IndexMaintainer.java | 4 +- .../phoenix/query/ConnectionQueryServices.java | 2 - .../query/ConnectionQueryServicesImpl.java | 62 ++++++------------- .../query/ConnectionlessQueryServicesImpl.java | 15 +---- .../query/DelegateConnectionQueryServices.java | 6 -- .../org/apache/phoenix/schema/PTableImpl.java | 6 +- .../transaction/OmidTransactionContext.java | 25 ++++++++ .../transaction/PhoenixTransactionContext.java | 26 ++++++++ .../transaction/TephraTransactionContext.java | 64 +++++++++++++++++--- 12 files changed, 136 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java index 0bc7c24..97b5e71 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java @@ -47,7 +47,6 @@ import org.apache.phoenix.transaction.PhoenixTransactionalTable; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; -import org.apache.tephra.TransactionSystemClient; import org.junit.Test; /** @@ -213,8 +212,6 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { String fullTableName = generateUniqueName(); PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - TransactionSystemClient txServiceClient = pconn.getQueryServices().getTransactionSystemClient(); - Statement stmt = conn.createStatement(); stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true"); HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName)); @@ -227,7 +224,6 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { assertEquals(1,rs.getInt(1)); } - // Use HBase level Tephra APIs to start a new transaction //TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW); PhoenixTransactionContext txContext = TransactionFactory.getTransactionFactory().getTransactionContext(pconn); PhoenixTransactionalTable txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable); @@ -260,7 +256,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { assertTrue(rs.next()); assertEquals(3,rs.getInt(1)); - // Use Tephra APIs directly to finish (i.e. commit) the transaction + // Use TM APIs directly to finish (i.e. commit) the transaction txContext.commit(); // Confirm that attempt to commit row with conflict fails @@ -306,7 +302,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { assertTrue(rs.next()); assertEquals(4,rs.getInt(1)); - // Use Tephra APIs directly to abort (i.e. rollback) the transaction + // Use TM APIs directly to abort (i.e. rollback) the transaction txContext.abort(); rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java index badf39b..fcb463c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java @@ -53,10 +53,10 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; -import org.apache.tephra.TxConstants; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -391,7 +391,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { admin.createTable(desc); ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true"; conn.createStatement().execute(ddl); - assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA)); + assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf(t2)).getValue(PhoenixTransactionContext.READ_NON_TX_DATA)); // Should be ok, as HBase metadata should match existing metadata. ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 78c510b..46cc953 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -47,7 +47,6 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TestUtil; -import org.apache.tephra.TxConstants; import org.junit.Test; public class TransactionIT extends ParallelStatsDisabledIT { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 2224e38..19ba609 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -100,6 +100,7 @@ import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.tuple.BaseTuple; import org.apache.phoenix.schema.tuple.ValueGetterTuple; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.BitSet; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -108,7 +109,6 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; -import org.apache.tephra.TxConstants; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -1064,7 +1064,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() // Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor - || (CellUtil.matchingQualifier(kv, TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) { + || (CellUtil.matchingQualifier(kv, TransactionFactory.getTransactionFactory().getTransactionContext().get_famility_delete_marker()) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) { nDeleteCF++; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 38580e4..45ab5fa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -46,7 +46,6 @@ import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; -import org.apache.tephra.TransactionSystemClient; public interface ConnectionQueryServices extends QueryServices, MetaDataMutated { @@ -132,7 +131,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public long clearCache() throws SQLException; public int getSequenceSaltBuckets(); - TransactionSystemClient getTransactionSystemClient(); public long getRenewLeaseThresholdMilliSeconds(); public boolean isRenewingLeasesEnabled(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 59252ad..815f669 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -219,6 +219,8 @@ import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PUnsignedTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.ConfigUtil; @@ -232,11 +234,6 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.UpgradeUtil; -import org.apache.tephra.TransactionSystemClient; -import org.apache.tephra.TxConstants; -import org.apache.tephra.distributed.PooledClientProvider; -import org.apache.tephra.distributed.TransactionServiceClient; -import org.apache.tephra.zookeeper.TephraZKClientService; import org.apache.twill.discovery.ZKDiscoveryService; import org.apache.twill.zookeeper.RetryStrategies; import org.apache.twill.zookeeper.ZKClientService; @@ -287,7 +284,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private HConnection connection; private ZKClientService txZKClientService; - private TransactionServiceClient txServiceClient; private volatile boolean initialized; private volatile int nSequenceSaltBuckets; @@ -396,32 +392,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } - @Override - public TransactionSystemClient getTransactionSystemClient() { - return txServiceClient; - } - private void initTxServiceClient() { - String zkQuorumServersString = this.getProps().get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); - if (zkQuorumServersString==null) { - zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort(); - } - - int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); - // Create instance of the tephra zookeeper client - txZKClientService = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - new TephraZKClientService(zkQuorumServersString, timeOut, null, - ArrayListMultimap.<String, byte[]>create()), - RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) - ) - ); - txZKClientService.startAndWait(); - ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService); - PooledClientProvider pooledClientProvider = new PooledClientProvider( - config, zkDiscoveryService); - this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider); + txZKClientService = TransactionFactory.getTransactionFactory().getTransactionContext().setTransactionClient(config, props, connectionInfo); } private void openConnection() throws SQLException { @@ -862,7 +834,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } boolean isTransactional = Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || - Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER TABLE + Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA)); // For ALTER TABLE // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use @@ -1125,7 +1097,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // If mapping an existing table as transactional, set property so that existing // data is correctly read. if (willBeTx) { - newDesc.setValue(TxConstants.READ_NON_TX_DATA, Boolean.TRUE.toString()); + newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.TRUE.toString()); } else { // If we think we're creating a non transactional table when it's already // transactional, don't allow. @@ -1134,7 +1106,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement .setSchemaName(SchemaUtil.getSchemaNameFromFullName(tableName)) .setTableName(SchemaUtil.getTableNameFromFullName(tableName)).build().buildException(); } - newDesc.remove(TxConstants.READ_NON_TX_DATA); + newDesc.remove(PhoenixTransactionContext.READ_NON_TX_DATA); } if (existingDesc.equals(newDesc)) { return null; // Indicate that no metadata was changed @@ -1754,7 +1726,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size()); tableDescriptors.add(tableDescriptor); origTableDescriptors.add(origTableDescriptor); - nonTxToTx = Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); + nonTxToTx = Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA)); /* * If the table was transitioned from non transactional to transactional, we need * to also transition the index tables. @@ -1864,7 +1836,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement indexTableProps = Collections.<String,Object>emptyMap(); } else { indexTableProps = Maps.newHashMapWithExpectedSize(1); - indexTableProps.put(TxConstants.READ_NON_TX_DATA, Boolean.valueOf(txValue)); + indexTableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.valueOf(txValue)); } for (PTable index : table.getIndexes()) { HTableDescriptor indexDescriptor = admin.getTableDescriptor(index.getPhysicalName().getBytes()); @@ -1877,7 +1849,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(indexFamilyName); HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(dataFamilyName); indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); - indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); + indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL)); } else { for (PColumnFamily family : index.getColumnFamilies()) { byte[] familyName = family.getName().getBytes(); @@ -1885,7 +1857,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName); HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName); indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); - indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); + indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL)); } } setTransactional(indexDescriptor, index.getType(), txValue, indexTableProps); @@ -1921,7 +1893,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName); HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName); indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); - indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); + indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL)); } else { for (PColumnFamily family : table.getColumnFamilies()) { byte[] familyName = family.getName().getBytes(); @@ -1929,7 +1901,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (indexColDescriptor != null) { HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName); indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); - indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); + indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL)); } } } @@ -1957,9 +1929,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } private void setTransactional(HTableDescriptor tableDescriptor, PTableType tableType, String txValue, Map<String, Object> tableProps) throws SQLException { if (txValue == null) { - tableDescriptor.remove(TxConstants.READ_NON_TX_DATA); + tableDescriptor.remove(PhoenixTransactionContext.READ_NON_TX_DATA); } else { - tableDescriptor.setValue(TxConstants.READ_NON_TX_DATA, txValue); + tableDescriptor.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, txValue); } this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps); } @@ -2005,7 +1977,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement commonFamilyProps.put(propName, prop.getSecond()); } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) { willBeTransactional = isOrWillBeTransactional = true; - tableProps.put(TxConstants.READ_NON_TX_DATA, propValue); + tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, propValue); } } else { if (MetaDataUtil.isHColumnProperty(propName)) { @@ -2172,10 +2144,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (props == null) { props = new HashMap<String, Object>(); } - props.put(TxConstants.PROPERTY_TTL, ttl); + props.put(PhoenixTransactionContext.PROPERTY_TTL, ttl); // Remove HBase TTL if we're not transitioning an existing table to become transactional // or if the existing transactional table wasn't originally non transactional. - if (!willBeTransactional && !Boolean.valueOf(newTableDescriptor.getValue(TxConstants.READ_NON_TX_DATA))) { + if (!willBeTransactional && !Boolean.valueOf(newTableDescriptor.getValue(PhoenixTransactionContext.READ_NON_TX_DATA))) { props.remove(TTL); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 47ef954..8e72e74 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -80,15 +80,13 @@ import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SequenceUtil; -import org.apache.tephra.TransactionManager; -import org.apache.tephra.TransactionSystemClient; -import org.apache.tephra.inmemory.InMemoryTxSystemClient; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -107,7 +105,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple private PMetaData metaData; private final Map<SequenceKey, SequenceInfo> sequenceMap = Maps.newHashMap(); private final String userName; - private final TransactionSystemClient txSystemClient; private KeyValueBuilder kvBuilder; private volatile boolean initialized; private volatile SQLException initializationException; @@ -119,7 +116,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple super(services); userName = connInfo.getPrincipal(); metaData = newEmptyMetaData(); - + // Use KeyValueBuilder that builds real KeyValues, as our test utils require this this.kvBuilder = GenericKeyValueBuilder.INSTANCE; Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); @@ -138,8 +135,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple // Without making a copy of the configuration we cons up, we lose some of our properties // on the server side during testing. this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config); - TransactionManager txnManager = new TransactionManager(config); - this.txSystemClient = new InMemoryTxSystemClient(txnManager); + TransactionFactory.getTransactionFactory().getTransactionContext().setInMemoryTransactionClient(config); this.guidePostsCache = new GuidePostsCache(this, config); } @@ -531,11 +527,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); } - @Override - public TransactionSystemClient getTransactionSystemClient() { - return txSystemClient; - } - public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary) throws SQLException { return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0l, null); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 7f7c027..6c464eb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -47,7 +47,6 @@ import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; -import org.apache.tephra.TransactionSystemClient; public class DelegateConnectionQueryServices extends DelegateQueryServices implements ConnectionQueryServices { @@ -257,11 +256,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple } @Override - public TransactionSystemClient getTransactionSystemClient() { - return getDelegate().getTransactionSystemClient(); - } - - @Override public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary) throws SQLException { return getDelegate().createFunction(functionData, function, temporary); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index d91ebcb..ab8fe5c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -69,13 +69,13 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SizedUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; -import org.apache.tephra.TxConstants; import com.google.common.base.Objects; import com.google.common.base.Preconditions; @@ -1022,11 +1022,11 @@ public class PTableImpl implements PTable { if (PTableImpl.this.isTransactional()) { Put put = new Put(key); if (families.isEmpty()) { - put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), TxConstants.FAMILY_DELETE_QUALIFIER, ts, + put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), TransactionFactory.getTransactionFactory().getTransactionContext().get_famility_delete_marker(), ts, HConstants.EMPTY_BYTE_ARRAY); } else { for (PColumnFamily colFamily : families) { - put.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER, ts, + put.add(colFamily.getName().getBytes(), TransactionFactory.getTransactionFactory().getTransactionContext().get_famility_delete_marker(), ts, HConstants.EMPTY_BYTE_ARRAY); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index d122d0c..cec07d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -3,8 +3,12 @@ package org.apache.phoenix.transaction; import java.sql.SQLException; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.twill.zookeeper.ZKClientService; import org.slf4j.Logger; public class OmidTransactionContext implements PhoenixTransactionContext { @@ -116,4 +120,25 @@ public class OmidTransactionContext implements PhoenixTransactionContext { // TODO Auto-generated method stub return null; } + + @Override + public void setInMemoryTransactionClient(Configuration config) { + // TODO Auto-generated method stub + + } + + @Override + public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, + ConnectionInfo connectionInfo) { + // TODO Auto-generated method stub + + return null; + + } + + @Override + public byte[] get_famility_delete_marker() { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java index 0854f4e..36f7804 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -1,7 +1,11 @@ package org.apache.phoenix.transaction; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.twill.zookeeper.ZKClientService; import org.slf4j.Logger; import java.sql.SQLException; @@ -27,6 +31,22 @@ public interface PhoenixTransactionContext { public static final String READ_NON_TX_DATA = "data.tx.read.pre.existing"; /** + * Set the in memory client connection to the transaction manager (for testing purpose) + * + * @param config + */ + public void setInMemoryTransactionClient(Configuration config); + + /** + * Set the client connection to the transaction manager + * + * @param config + * @param props + * @param connectionInfo + */ + public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo); + + /** * Starts a transaction * * @throws SQLException @@ -138,4 +158,10 @@ public interface PhoenixTransactionContext { * @return the coprocessor */ public BaseRegionObserver getCoProcessor(); + + /** + * + * @return the family delete marker + */ + public byte[] get_famility_delete_marker(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f090dd24/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java index a5e6e64..0334826 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -10,12 +10,13 @@ import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; +import org.apache.phoenix.util.ReadOnlyProps; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionAware; import org.apache.tephra.TransactionCodec; @@ -26,12 +27,21 @@ import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.Transaction.VisibilityLevel; import org.apache.tephra.TxConstants; +import org.apache.tephra.distributed.PooledClientProvider; +import org.apache.tephra.distributed.TransactionServiceClient; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; import org.apache.tephra.inmemory.InMemoryTxSystemClient; import org.apache.tephra.util.TxUtils; import org.apache.tephra.visibility.FenceWait; import org.apache.tephra.visibility.VisibilityFence; - +import org.apache.tephra.zookeeper.TephraZKClientService; +import org.apache.twill.discovery.ZKDiscoveryService; +import org.apache.twill.zookeeper.RetryStrategies; +import org.apache.twill.zookeeper.ZKClientService; +import org.apache.twill.zookeeper.ZKClientServices; +import org.apache.twill.zookeeper.ZKClients; + +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -40,6 +50,8 @@ public class TephraTransactionContext implements PhoenixTransactionContext { private static final TransactionCodec CODEC = new TransactionCodec(); + private static TransactionSystemClient txClient = null; + private final List<TransactionAware> txAwares; private final TransactionContext txContext; private Transaction tx; @@ -59,17 +71,16 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } public TephraTransactionContext(PhoenixConnection connection) { - this.txServiceClient = connection.getQueryServices() - .getTransactionSystemClient(); + assert(txClient != null); + this.txServiceClient = txClient; this.txAwares = Collections.emptyList(); this.txContext = new TransactionContext(txServiceClient); } public TephraTransactionContext(PhoenixTransactionContext ctx, PhoenixConnection connection, boolean subTask) { - this.txServiceClient = connection.getQueryServices() - .getTransactionSystemClient(); - + assert(txClient != null); + this.txServiceClient = txClient; assert (ctx instanceof TephraTransactionContext); TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx; @@ -86,6 +97,38 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } @Override + public void setInMemoryTransactionClient(Configuration config) { + TransactionManager txnManager = new TransactionManager(config); + txClient = this.txServiceClient = new InMemoryTxSystemClient(txnManager); + } + + @Override + public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo) { + String zkQuorumServersString = props.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); + if (zkQuorumServersString==null) { + zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort(); + } + + int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); + // Create instance of the tephra zookeeper client + ZKClientService txZKClientService = ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + new TephraZKClientService(zkQuorumServersString, timeOut, null, + ArrayListMultimap.<String, byte[]>create()), + RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) + ) + ); + txZKClientService.startAndWait(); + ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService); + PooledClientProvider pooledClientProvider = new PooledClientProvider( + config, zkDiscoveryService); + txClient = this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider); + + return txZKClientService; + } + + @Override public void begin() throws SQLException { if (txContext == null) { throw new SQLExceptionInfo.Builder( @@ -362,6 +405,11 @@ public class TephraTransactionContext implements PhoenixTransactionContext { return new TransactionProcessor(); } + @Override + public byte[] get_famility_delete_marker() { + return TxConstants.FAMILY_DELETE_QUALIFIER; + } + /** * TephraTransactionContext specific functions */
