Repository: phoenix Updated Branches: refs/heads/txn d49c3bf6f -> bd271b214
Moved TransactionContext from MutationState to PhoenixConnection Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bd271b21 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bd271b21 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bd271b21 Branch: refs/heads/txn Commit: bd271b214dab942631798ea77688e51f9442ef18 Parents: d49c3bf Author: Thomas D'Silva <twdsi...@gmail.com> Authored: Thu Mar 12 00:46:33 2015 -0700 Committer: Thomas D'Silva <twdsi...@gmail.com> Committed: Thu Mar 12 00:46:33 2015 -0700 ---------------------------------------------------------------------- .../apache/phoenix/execute/MutationState.java | 198 ++++++++----------- .../apache/phoenix/jdbc/PhoenixConnection.java | 67 ++++++- 2 files changed, 143 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd271b21/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 6e37cc5..45b9dd6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -27,9 +27,12 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.Map.Entry; +import java.util.Set; + +import co.cask.tephra.TransactionAware; +import co.cask.tephra.hbase98.TransactionAwareHTable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -39,15 +42,12 @@ import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; -import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.monitoring.PhoenixMetrics; -import org.apache.phoenix.query.HBaseFactoryProvider; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.MetaDataClient; @@ -63,24 +63,11 @@ import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.ServerUtil; -import org.apache.twill.discovery.ZKDiscoveryService; -import org.apache.twill.zookeeper.RetryStrategies; -import org.apache.twill.zookeeper.ZKClientService; -import org.apache.twill.zookeeper.ZKClientServices; -import org.apache.twill.zookeeper.ZKClients; import org.cloudera.htrace.Span; import org.cloudera.htrace.TraceScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import co.cask.tephra.TransactionAware; -import co.cask.tephra.TransactionContext; -import co.cask.tephra.TransactionFailureException; -import co.cask.tephra.distributed.PooledClientProvider; -import co.cask.tephra.distributed.TransactionServiceClient; -import co.cask.tephra.hbase98.TransactionAwareHTable; - -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -96,53 +83,45 @@ public class MutationState implements SQLCloseable { private static final Logger logger = LoggerFactory.getLogger(MutationState.class); private PhoenixConnection connection; - private final TransactionServiceClient transactionServiceClient; private final long maxSize; private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); - private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations; // TODO: Sizing? + // map from table to rows + // rows - map from rowkey to columns + // columns - map from column to value + private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing? + // map from table ref to htable (possibly a TransactionAwareHTable) + private Map<TableRef, HTableInterface> tableRefToHTableMap; private long sizeOffset; - private int numRows; + private int numRows = 0; - public MutationState(int maxSize, PhoenixConnection connection) throws SQLException { + public MutationState(int maxSize, PhoenixConnection connection) { this(maxSize,connection,0); } - public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) throws SQLException { - this(maxSize, connection, sizeOffset, Maps.<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>newHashMapWithExpectedSize(3), 0); + public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) { + this.maxSize = maxSize; + this.connection = connection; + this.sizeOffset = sizeOffset; } - public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) throws SQLException { - this(maxSize, connection, sizeOffset, Maps.newHashMap(ImmutableMap.of(table, mutations)), mutations.size()); + public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { + this.maxSize = maxSize; + this.connection = connection; + this.mutations.put(table, mutations); + this.sizeOffset = sizeOffset; + this.numRows = mutations.size(); + throwIfTooBig(); } - public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset, Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations, int numRows) throws SQLException { + private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) { this.maxSize = maxSize; this.connection = connection; this.sizeOffset = sizeOffset; - this.mutations = mutations; - this.numRows = numRows; + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) { + numRows += entry.getValue().size(); + this.mutations.put(entry.getKey(), entry.getValue()); + } throwIfTooBig(); - - //create a transaction service client -// Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); -// String zkQuorumServersString = ConnectionInfo.getZookeeperConnectionString(connection.getURL()); -// ZKClientService zkClientService = ZKClientServices.delegate( -// ZKClients.reWatchOnExpire( -// ZKClients.retryOnFailure( -// ZKClientService.Builder.of(zkQuorumServersString) -// .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) -// .build(), -// RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) -// ) -// ) -// ); -// zkClientService.startAndWait(); -// ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService); -// PooledClientProvider pooledClientProvider = new PooledClientProvider( -// config, zkDiscoveryService); -// this.transactionServiceClient = new TransactionServiceClient(config, -// pooledClientProvider); - this.transactionServiceClient = null; } private void throwIfTooBig() { @@ -381,49 +360,31 @@ public class MutationState implements SQLCloseable { } } - @SuppressWarnings("deprecation") - public void commit() throws SQLException { - // create list of transaction aware htables - List<TransactionAware> txAwareHTables = Lists.newArrayListWithExpectedSize(mutations.size()); - // create list of htables (some of which could be transactional) - List<HTableInterface> hTables = Lists.newArrayListWithExpectedSize(mutations.size()); - for ( TableRef tableRef : this.mutations.keySet()) { - PTable table = tableRef.getTable(); - byte[] hTableName = table.getPhysicalName().getBytes(); - HTableInterface hTable = connection.getQueryServices().getTable(hTableName); - if (table.isTransactional()) { - TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(hTable); - txAwareHTables.add(transactionAwareHTable); - hTable = transactionAwareHTable; - } - hTables.add(hTable); - } - - if (txAwareHTables.isEmpty()) { - commitMutations(hTables); - } - else { - TransactionContext transactionContext = new TransactionContext(transactionServiceClient, txAwareHTables); - try { - transactionContext.start(); - commitMutations(hTables); - transactionContext.finish(); - } catch (TransactionFailureException e) { - try { - transactionContext.abort(); - throw new SQLExceptionInfo.Builder( - SQLExceptionCode.TRANSACTION_FINISH_EXCEPTION) - .setRootCause(e).build().buildException(); - } catch (TransactionFailureException e1) { - throw new SQLExceptionInfo.Builder( - SQLExceptionCode.TRANSACTION_ABORT_EXCEPTION) - .setRootCause(e1).build().buildException(); - } - } - } + /** + * Creates a map from table ref to htable which is used by {@link PhoenixConnection} + * for transactions + * @return list of transaction aware htables + */ + public List<TransactionAware> preSend() throws SQLException { + List<TransactionAware> txAwareHTables = Lists.newArrayListWithExpectedSize(mutations.size()); + tableRefToHTableMap = Maps.newHashMapWithExpectedSize(mutations.size()); + for ( TableRef tableRef : this.mutations.keySet()) { + PTable table = tableRef.getTable(); + byte[] hTableName = table.getPhysicalName().getBytes(); + HTableInterface hTable = connection.getQueryServices().getTable(hTableName); + if (table.isTransactional()) { + TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(hTable); + txAwareHTables.add(transactionAwareHTable); + hTable = transactionAwareHTable; + } + tableRefToHTableMap.put(tableRef, hTable); + } + return txAwareHTables; } - public void commitMutations(List<HTableInterface> hTables) throws SQLException { + + @SuppressWarnings("deprecation") + public void send() throws SQLException { int i = 0; byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); long[] serverTimeStamps = validate(); @@ -486,10 +447,7 @@ public class MutationState implements SQLCloseable { } SQLException sqlE = null; - HTableInterface hTable = connection.getQueryServices().getTable(htableName); - if (table.isTransactional()) { - hTable = new TransactionAwareHTable(hTable); - } + HTableInterface hTable = tableRefToHTableMap.get(tableRef); try { logMutationSize(hTable, mutations, connection); MUTATION_BATCH_SIZE.update(mutations.size()); @@ -524,31 +482,15 @@ public class MutationState implements SQLCloseable { } // Throw to client with both what was committed so far and what is left to be committed. // That way, client can either undo what was done or try again with what was not done. - int numCommitedMutations = 0; - Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> commitedMutations = Maps.newHashMapWithExpectedSize(committedList.size()); - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> committedEntry : committedList) { - numCommitedMutations += committedEntry.getValue().size(); - commitedMutations.put(committedEntry.getKey(), committedEntry.getValue()); - } - sqlE = new CommitException(e, this, new MutationState(this.maxSize, this.connection, this.sizeOffset, commitedMutations, numCommitedMutations)); + sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection)); } finally { try { - hTable.close(); - } catch (IOException e) { - if (sqlE != null) { - sqlE.setNextException(ServerUtil.parseServerException(e)); - } else { - sqlE = ServerUtil.parseServerException(e); + if (cache != null) { + cache.close(); } } finally { - try { - if (cache != null) { - cache.close(); - } - } finally { - if (sqlE != null) { - throw sqlE; - } + if (sqlE != null) { + throw sqlE; } } } @@ -565,7 +507,29 @@ public class MutationState implements SQLCloseable { assert(this.mutations.isEmpty()); } - public void rollback(PhoenixConnection connection) throws SQLException { + public void postSend() throws SQLException { + SQLException sqlE = null; + for (Entry<TableRef, HTableInterface> entry : tableRefToHTableMap.entrySet()) { + HTableInterface hTable = entry.getValue(); + try { + hTable.close(); + } + catch (IOException e) { + if (sqlE != null) { + sqlE.setNextException(ServerUtil.parseServerException(e)); + } else { + sqlE = ServerUtil.parseServerException(e); + } + } + finally { + if (sqlE != null) { + throw sqlE; + } + } + } + } + + public void clear(PhoenixConnection connection) throws SQLException { this.mutations.clear(); numRows = 0; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd271b21/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index d5687f0..b5c9a2d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -51,9 +51,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.call.CallRunner; import org.apache.phoenix.exception.SQLExceptionCode; @@ -61,9 +63,11 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.expression.function.FunctionArgumentType; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.DelegateConnectionQueryServices; +import org.apache.phoenix.query.HBaseFactoryProvider; import org.apache.phoenix.query.MetaDataMutated; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -92,9 +96,20 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; +import org.apache.twill.discovery.ZKDiscoveryService; +import org.apache.twill.zookeeper.RetryStrategies; +import org.apache.twill.zookeeper.ZKClientService; +import org.apache.twill.zookeeper.ZKClientServices; +import org.apache.twill.zookeeper.ZKClients; import org.cloudera.htrace.Sampler; import org.cloudera.htrace.TraceScope; +import co.cask.tephra.TransactionAware; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionFailureException; +import co.cask.tephra.distributed.PooledClientProvider; +import co.cask.tephra.distributed.TransactionServiceClient; + import com.google.common.base.Objects; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -123,6 +138,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private List<SQLCloseable> statements = new ArrayList<SQLCloseable>(); private final Map<PDataType<?>, Format> formatters = new HashMap<>(); private final MutationState mutationState; + private final TransactionServiceClient transactionServiceClient; private final int mutateBatchSize; private final Long scn; private boolean isAutoCommit = false; @@ -241,7 +257,25 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd // setup tracing, if its enabled this.sampler = Tracing.getConfiguredSampler(this); this.customTracingAnnotations = getImmutableCustomTracingAnnotations(); - + + //create a transaction service client + Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); + String zkQuorumServersString = ConnectionInfo.getZookeeperConnectionString(url); + ZKClientService zkClientService = ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + ZKClientService.Builder.of(zkQuorumServersString) + .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) + .build(), + RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) + ) + ) + ); + zkClientService.startAndWait(); + ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService); + PooledClientProvider pooledClientProvider = new PooledClientProvider( + config, zkDiscoveryService); + this.transactionServiceClient = new TransactionServiceClient(config,pooledClientProvider); } private ImmutableMap<String, String> getImmutableCustomTracingAnnotations() { @@ -398,7 +432,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd // from modifying this list. this.statements = Lists.newArrayList(); try { - mutationState.rollback(this); + mutationState.clear(this); } finally { try { SQLCloseables.closeAll(statements); @@ -426,13 +460,36 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd isClosed = true; } } - + @Override public void commit() throws SQLException { CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() { @Override public Void call() throws SQLException { - mutationState.commit(); + List<TransactionAware> txAwareHTables = mutationState.preSend(); + if (txAwareHTables.isEmpty()) { + mutationState.send(); + } + else { + TransactionContext transactionContext = new TransactionContext(transactionServiceClient, txAwareHTables); + try { + transactionContext.start(); + mutationState.send(); + transactionContext.finish(); + } catch (TransactionFailureException e) { + try { + transactionContext.abort(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FINISH_EXCEPTION) + .setRootCause(e).build().buildException(); + } catch (TransactionFailureException e1) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_ABORT_EXCEPTION) + .setRootCause(e1).build().buildException(); + } + } + } + mutationState.postSend(); return null; } }, Tracing.withTracing(this, "committing mutations")); @@ -635,7 +692,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd @Override public void rollback() throws SQLException { - mutationState.rollback(this); + mutationState.clear(this); } @Override