Repository: phoenix Updated Branches: refs/heads/txn 8d5e3691b -> d1c2306da
Start txn on first stmt exec Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d1c2306d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d1c2306d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d1c2306d Branch: refs/heads/txn Commit: d1c2306daa6a3f93abeff0d5f8d177eb79ad5162 Parents: 8d5e369 Author: James Taylor <jtay...@salesforce.com> Authored: Thu Mar 12 16:47:55 2015 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Thu Mar 12 16:47:55 2015 -0700 ---------------------------------------------------------------------- .../ConnectionQueryServicesTestImpl.java | 2 +- .../apache/phoenix/execute/MutationState.java | 71 +++++----------- .../phoenix/iterate/TableResultIterator.java | 4 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 87 +++++++++++--------- .../org/apache/phoenix/jdbc/PhoenixDriver.java | 2 +- .../phoenix/jdbc/PhoenixEmbeddedDriver.java | 5 -- .../apache/phoenix/jdbc/PhoenixStatement.java | 14 ++++ .../phoenix/query/ConnectionQueryServices.java | 4 + .../query/ConnectionQueryServicesImpl.java | 36 ++++++++ .../query/ConnectionlessQueryServicesImpl.java | 35 +++++++- .../query/DelegateConnectionQueryServices.java | 7 ++ .../apache/phoenix/jdbc/PhoenixTestDriver.java | 4 +- 12 files changed, 166 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java index bee8d21..2edab92 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java @@ -35,7 +35,7 @@ import org.apache.phoenix.query.QueryServices; public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl { protected int NUM_SLAVES_BASE = 1; // number of slaves for the cluster - public ConnectionQueryServicesTestImpl(QueryServices services, ConnectionInfo info) throws SQLException { + public ConnectionQueryServicesTestImpl(QueryServices services, ConnectionInfo info, Properties info2) throws SQLException { super(services, info, null); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 45b9dd6..c452991 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -27,10 +27,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import co.cask.tephra.TransactionAware; import co.cask.tephra.hbase98.TransactionAwareHTable; import org.apache.hadoop.hbase.HConstants; @@ -89,8 +86,6 @@ public class MutationState implements SQLCloseable { // rows - map from rowkey to columns // columns - map from column to value private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing? - // map from table ref to htable (possibly a TransactionAwareHTable) - private Map<TableRef, HTableInterface> tableRefToHTableMap; private long sizeOffset; private int numRows = 0; @@ -360,29 +355,6 @@ public class MutationState implements SQLCloseable { } } - /** - * Creates a map from table ref to htable which is used by {@link PhoenixConnection} - * for transactions - * @return list of transaction aware htables - */ - public List<TransactionAware> preSend() throws SQLException { - List<TransactionAware> txAwareHTables = Lists.newArrayListWithExpectedSize(mutations.size()); - tableRefToHTableMap = Maps.newHashMapWithExpectedSize(mutations.size()); - for ( TableRef tableRef : this.mutations.keySet()) { - PTable table = tableRef.getTable(); - byte[] hTableName = table.getPhysicalName().getBytes(); - HTableInterface hTable = connection.getQueryServices().getTable(hTableName); - if (table.isTransactional()) { - TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(hTable); - txAwareHTables.add(transactionAwareHTable); - hTable = transactionAwareHTable; - } - tableRefToHTableMap.put(tableRef, hTable); - } - return txAwareHTables; - } - - @SuppressWarnings("deprecation") public void send() throws SQLException { int i = 0; @@ -447,8 +419,15 @@ public class MutationState implements SQLCloseable { } SQLException sqlE = null; - HTableInterface hTable = tableRefToHTableMap.get(tableRef); + HTableInterface hTable = connection.getQueryServices().getTable(table.getPhysicalName().getBytes()); try { + // Don't add immutable indexes (those are the only ones that would participate + // during a commit), as we don't need conflict detection for these. + if (table.isTransactional() && table.getType() != PTableType.INDEX) { + TransactionAwareHTable txnAware = new TransactionAwareHTable(hTable); + connection.addTxParticipant(txnAware); + hTable = txnAware; + } logMutationSize(hTable, mutations, connection); MUTATION_BATCH_SIZE.update(mutations.size()); long startTime = System.currentTimeMillis(); @@ -489,6 +468,16 @@ public class MutationState implements SQLCloseable { cache.close(); } } finally { + try { + hTable.close(); + } + catch (IOException e) { + if (sqlE != null) { + sqlE.setNextException(ServerUtil.parseServerException(e)); + } else { + sqlE = ServerUtil.parseServerException(e); + } + } if (sqlE != null) { throw sqlE; } @@ -507,29 +496,7 @@ public class MutationState implements SQLCloseable { assert(this.mutations.isEmpty()); } - public void postSend() throws SQLException { - SQLException sqlE = null; - for (Entry<TableRef, HTableInterface> entry : tableRefToHTableMap.entrySet()) { - HTableInterface hTable = entry.getValue(); - try { - hTable.close(); - } - catch (IOException e) { - if (sqlE != null) { - sqlE.setNextException(ServerUtil.parseServerException(e)); - } else { - sqlE = ServerUtil.parseServerException(e); - } - } - finally { - if (sqlE != null) { - throw sqlE; - } - } - } - } - - public void clear(PhoenixConnection connection) throws SQLException { + public void clear() throws SQLException { this.mutations.clear(); numRows = 0; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 91aa573..5beb2e1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -86,7 +86,9 @@ public class TableResultIterator extends ExplainTable implements ResultIterator PTable table = tableRef.getTable(); HTableInterface htable = context.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); if (table.isTransactional()) { - htable = new TransactionAwareHTable(htable); + TransactionAwareHTable txnAware = new TransactionAwareHTable(htable); + context.getConnection().addTxParticipant(txnAware); + htable = txnAware; } this.htable = htable; if (creationMode == ScannerCreation.IMMEDIATE) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index a24f76b..613d3e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -57,7 +57,7 @@ import javax.annotation.Nullable; import co.cask.tephra.TransactionAware; import co.cask.tephra.TransactionContext; import co.cask.tephra.TransactionFailureException; -import co.cask.tephra.distributed.TransactionServiceClient; +import co.cask.tephra.TransactionSystemClient; import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.call.CallRunner; @@ -128,7 +128,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private List<SQLCloseable> statements = new ArrayList<SQLCloseable>(); private final Map<PDataType<?>, Format> formatters = new HashMap<>(); private final MutationState mutationState; - private final TransactionServiceClient transactionServiceClient; private final int mutateBatchSize; private final Long scn; private boolean isAutoCommit = false; @@ -139,6 +138,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private final String timestampPattern; private TraceScope traceScope = null; + private TransactionContext txContext; private boolean isClosed = false; private Sampler<?> sampler; private boolean readOnly = false; @@ -247,33 +247,10 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd // setup tracing, if its enabled this.sampler = Tracing.getConfiguredSampler(this); this.customTracingAnnotations = getImmutableCustomTracingAnnotations(); - - //create a transaction service client - /* commenting out for now as this breaks many unit tests - * TODO: Move to ConnectionQueryServicesImpl - Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); - String zkQuorumServersString = ConnectionInfo.getZookeeperConnectionString(url); - ZKClientService zkClientService = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - ZKClientService.Builder.of(zkQuorumServersString) - .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) - .build(), - RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) - ) - ) - ); - zkClientService.startAndWait(); - ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService); - PooledClientProvider pooledClientProvider = new PooledClientProvider( - config, zkDiscoveryService); - this.transactionServiceClient = new TransactionServiceClient(config,pooledClientProvider); - */ - this.transactionServiceClient = null; } public TransactionContext getTransactionContext() { - return null; // TODO + return txContext; } private ImmutableMap<String, String> getImmutableCustomTracingAnnotations() { @@ -430,7 +407,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd // from modifying this list. this.statements = Lists.newArrayList(); try { - mutationState.clear(this); + mutationState.clear(); } finally { try { SQLCloseables.closeAll(statements); @@ -459,24 +436,42 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd } } + public void startTransaction() throws SQLException { + if (txContext == null) { + try { + TransactionSystemClient txServiceClient = this.getQueryServices().getTransactionSystemClient(); + this.txContext = new TransactionContext(txServiceClient); + txContext.start(); + } catch (TransactionFailureException e) { + throw new SQLException(e); // TODO: error code + } + } + } + + public void addTxParticipant(TransactionAware txnAware) { + txContext.addTransactionAware(txnAware); + } + + private boolean isTransactionStarted() { + return txContext != null; + } + + private void endTransaction() { + txContext = null; + } + @Override public void commit() throws SQLException { CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() { @Override public Void call() throws SQLException { - List<TransactionAware> txAwareHTables = mutationState.preSend(); - if (txAwareHTables.isEmpty()) { - mutationState.send(); - } - else { - TransactionContext transactionContext = new TransactionContext(transactionServiceClient, txAwareHTables); + mutationState.send(); + if (isTransactionStarted()) { try { - transactionContext.start(); - mutationState.send(); - transactionContext.finish(); + txContext.finish(); } catch (TransactionFailureException e) { try { - transactionContext.abort(); + txContext.abort(); throw new SQLExceptionInfo.Builder( SQLExceptionCode.TRANSACTION_FINISH_EXCEPTION) .setRootCause(e).build().buildException(); @@ -485,12 +480,13 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd SQLExceptionCode.TRANSACTION_ABORT_EXCEPTION) .setRootCause(e1).build().buildException(); } + } finally { + endTransaction(); } - } - mutationState.postSend(); + } return null; } - }, Tracing.withTracing(this, "committing mutations")); + }, Tracing.withTracing(this, "sending mutations")); } @Override @@ -690,7 +686,16 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd @Override public void rollback() throws SQLException { - mutationState.clear(this); + mutationState.clear(); + if (isTransactionStarted()) { + try { + txContext.abort(); + } catch (TransactionFailureException e) { + throw new SQLException(e); // TODO: error code + } finally { + endTransaction(); + } + } } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java index 6360d06..c45a35d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java @@ -147,7 +147,7 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver { ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo); if (connectionQueryServices == null) { if (normalizedConnInfo.isConnectionless()) { - connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo); + connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info); } else { connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java index 4a65b76..ff25fae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java @@ -251,11 +251,6 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni return new ConnectionInfo(quorum,port,rootNode, principal, keytabFile); } - public static String getZookeeperConnectionString(String url) throws SQLException { - ConnectionInfo connInfo = ConnectionInfo.create(url); - return connInfo.getZookeeperQuorum()+":"+connInfo.getPort(); - } - public ConnectionInfo normalize(ReadOnlyProps props) throws SQLException { String zookeeperQuorum = this.getZookeeperQuorum(); Integer port = this.getPort(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 746c0b7..2c0021f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -236,9 +236,13 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE); plan = connection.getQueryServices().getOptimizer().optimize( PhoenixStatement.this, plan); + startTransaction(plan); // this will create its own trace internally, so we don't wrap this // whole thing in tracing ResultIterator resultIterator = plan.iterator(); + if (connection.getAutoCommit()) { + connection.commit(); // Forces new read point for next statement + } if (logger.isDebugEnabled()) { String explainPlan = QueryUtil.getExplainPlan(resultIterator); logger.debug(LogUtil.addCustomAnnotations("Explain plan: " + explainPlan, connection)); @@ -271,6 +275,15 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } } + private void startTransaction(StatementPlan plan) throws SQLException { + for (TableRef ref : plan.getContext().getResolver().getTables()) { + if (ref.getTable().isTransactional()) { + connection.startTransaction(); + break; + } + } + } + protected int executeMutation(final CompilableStatement stmt) throws SQLException { if (connection.isReadOnly()) { throw new SQLExceptionInfo.Builder( @@ -289,6 +302,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho // the latest state try { MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE); + startTransaction(plan); MutationState state = plan.execute(); connection.getMutationState().join(state); if (connection.getAutoCommit()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 09705c6..57b5ac6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import co.cask.tephra.TransactionSystemClient; + import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -111,4 +113,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public void clearCache() throws SQLException; public int getSequenceSaltBuckets(); + + TransactionSystemClient getTransactionSystemClient(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 8a1671b..4b23d10 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -42,6 +42,9 @@ import java.util.concurrent.TimeoutException; import javax.annotation.concurrent.GuardedBy; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.distributed.PooledClientProvider; +import co.cask.tephra.distributed.TransactionServiceClient; import co.cask.tephra.hbase98.coprocessor.TransactionProcessor; import org.apache.hadoop.conf.Configuration; @@ -151,6 +154,11 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.UpgradeUtil; +import org.apache.twill.discovery.ZKDiscoveryService; +import org.apache.twill.zookeeper.RetryStrategies; +import org.apache.twill.zookeeper.ZKClientService; +import org.apache.twill.zookeeper.ZKClientServices; +import org.apache.twill.zookeeper.ZKClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,6 +181,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // Max number of cached table stats for view or shared index physical tables private static final int MAX_TABLE_STATS_CACHE_ENTRIES = 512; protected final Configuration config; + private final ConnectionInfo connectionInfo; // Copy of config.getProps(), but read-only to prevent synchronization that we // don't need. private final ReadOnlyProps props; @@ -194,6 +203,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final Object connectionCountLock = new Object(); private HConnection connection; + private TransactionServiceClient txServiceClient; private volatile boolean initialized; private volatile int nSequenceSaltBuckets; @@ -245,6 +255,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement for (Entry<String,String> entry : connectionInfo.asProps()) { config.set(entry.getKey(), entry.getValue()); } + this.connectionInfo = connectionInfo; // Without making a copy of the configuration we cons up, we lose some of our properties // on the server side during testing. @@ -269,6 +280,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement .build(); } + @Override + public TransactionSystemClient getTransactionSystemClient() { + return txServiceClient; + } + + private void initTxServiceClient() { + String zkQuorumServersString = connectionInfo.getZookeeperQuorum(); + ZKClientService zkClientService = ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + ZKClientService.Builder.of(zkQuorumServersString) + .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) + .build(), + RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) + ) + ) + ); + zkClientService.startAndWait(); + ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService); + PooledClientProvider pooledClientProvider = new PooledClientProvider( + config, zkDiscoveryService); + this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider); + } + private void openConnection() throws SQLException { try { // check if we need to authenticate with kerberos @@ -280,6 +315,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement User.login(config, HBASE_CLIENT_KEYTAB, HBASE_CLIENT_PRINCIPAL, null); logger.info("Successfull login to secure cluster!!"); } + initTxServiceClient(); this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config); } catch (IOException e) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 742c38e..82b0e1a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -23,9 +23,15 @@ import java.sql.SQLException; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import co.cask.tephra.TransactionManager; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.inmemory.InMemoryTxSystemClient; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -95,17 +101,37 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple private PMetaData metaData; private final Map<SequenceKey, SequenceInfo> sequenceMap = Maps.newHashMap(); private final String userName; + private final TransactionSystemClient txSystemClient; private KeyValueBuilder kvBuilder; private volatile boolean initialized; private volatile SQLException initializationException; private final Map<String, List<HRegionLocation>> tableSplits = Maps.newHashMap(); - public ConnectionlessQueryServicesImpl(QueryServices queryServices, ConnectionInfo connInfo) { - super(queryServices); + public ConnectionlessQueryServicesImpl(QueryServices services, ConnectionInfo connInfo, Properties info) { + super(services); userName = connInfo.getPrincipal(); metaData = newEmptyMetaData(); // Use KeyValueBuilder that builds real KeyValues, as our test utils require this this.kvBuilder = GenericKeyValueBuilder.INSTANCE; + // TOOD: copy/paste from ConnectionQueryServicesImpl + Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); + for (Entry<String,String> entry : services.getProps()) { + config.set(entry.getKey(), entry.getValue()); + } + if (info != null) { + for (Object key : info.keySet()) { + config.set((String) key, info.getProperty((String) key)); + } + } + for (Entry<String,String> entry : connInfo.asProps()) { + config.set(entry.getKey(), entry.getValue()); + } + + // Without making a copy of the configuration we cons up, we lose some of our properties + // on the server side during testing. + config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config); + TransactionManager txnManager = new TransactionManager(config); + this.txSystemClient = new InMemoryTxSystemClient(txnManager); } private PMetaData newEmptyMetaData() { @@ -479,5 +505,10 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple return getProps().getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); } + + @Override + public TransactionSystemClient getTransactionSystemClient() { + return txSystemClient; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index e2c9544..cf593e2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import co.cask.tephra.TransactionSystemClient; + import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -250,4 +252,9 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public int getSequenceSaltBuckets() { return getDelegate().getSequenceSaltBuckets(); } + + @Override + public TransactionSystemClient getTransactionSystemClient() { + return getDelegate().getTransactionSystemClient(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1c2306d/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java index 0d3c461..0b6573c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java @@ -89,9 +89,9 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver { if (connectionQueryServices != null) { return connectionQueryServices; } ConnectionInfo connInfo = ConnectionInfo.create(url); if (connInfo.isConnectionless()) { - connectionQueryServices = new ConnectionlessQueryServicesImpl(queryServices, connInfo); + connectionQueryServices = new ConnectionlessQueryServicesImpl(queryServices, connInfo, info); } else { - connectionQueryServices = new ConnectionQueryServicesTestImpl(queryServices, connInfo); + connectionQueryServices = new ConnectionQueryServicesTestImpl(queryServices, connInfo, info); } connectionQueryServices.init(url, info); return connectionQueryServices;