Saving partial results
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fa69563e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fa69563e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fa69563e Branch: refs/heads/omid Commit: fa69563e51fbebdf14d5af610506dd56b8289ec4 Parents: d2c1653 Author: Ohad Shacham <[email protected]> Authored: Mon Mar 13 12:22:51 2017 +0200 Committer: Ohad Shacham <[email protected]> Committed: Mon Mar 13 12:22:51 2017 +0200 ---------------------------------------------------------------------- .../apache/phoenix/execute/MutationState.java | 309 +++++++------------ .../transaction/OmidTransactionContext.java | 15 +- .../transaction/PhoenixTransactionContext.java | 27 +- .../transaction/TephraTransactionContext.java | 112 ++++--- .../transaction/TephraTransactionTable.java | 7 +- .../apache/phoenix/util/TransactionUtil.java | 22 +- 6 files changed, 230 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 4775d59..c480e30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -84,6 +84,10 @@ import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; +import org.apache.phoenix.transaction.PhoenixTransactionalTable; +import org.apache.phoenix.transaction.TephraTransactionContext; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; @@ -123,54 +127,53 @@ public class MutationState implements SQLCloseable { private static final TransactionCodec CODEC = new TransactionCodec(); private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0]; private static final int MAX_COMMIT_RETRIES = 3; - + private final PhoenixConnection connection; private final long maxSize; private final long maxSizeBytes; private long batchCount = 0L; private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; - private final List<TransactionAware> txAwares; - private final TransactionContext txContext; private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10); - - private Transaction tx; + private long sizeOffset; private int numRows = 0; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private boolean isExternalTxContext = false; private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); - + + final PhoenixTransactionContext phoenixTransactionContext; + private final MutationMetricQueue mutationMetricQueue; private ReadMetricQueue readMetricQueue; public MutationState(long maxSize, PhoenixConnection connection) { - this(maxSize,connection, null, null); + this(maxSize,connection, false, null); } - - public MutationState(long maxSize, PhoenixConnection connection, TransactionContext txContext) { - this(maxSize,connection, null, txContext); + + public MutationState(long maxSize, PhoenixConnection connection, PhoenixTransactionContext txContext) { + this(maxSize,connection, false, txContext); } - + public MutationState(MutationState mutationState) { - this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction(), null); + this(mutationState.maxSize, mutationState.connection, true, mutationState.getPhoenixTransactionContext()); } - + public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) { - this(maxSize, connection, null, null, sizeOffset); + this(maxSize, connection, false, null, sizeOffset); } - - private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext) { - this(maxSize,connection, tx, txContext, 0); + + private MutationState(long maxSize, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext) { + this(maxSize,connection, subTask, txContext, 0); } - - private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) { - this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, txContext); + + private MutationState(long maxSize, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) { + this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, txContext); this.sizeOffset = sizeOffset; } - + MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, - Transaction tx, TransactionContext txContext) { + boolean subTask, PhoenixTransactionContext txContext) { this.maxSize = maxSize; this.connection = connection; this.maxSizeBytes = connection.getMutateBatchSizeBytes(); @@ -178,30 +181,24 @@ public class MutationState implements SQLCloseable { boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; - this.tx = tx; - if (tx == null) { - this.txAwares = Collections.emptyList(); + if (subTask == false) { if (txContext == null) { - TransactionSystemClient txServiceClient = this.connection - .getQueryServices().getTransactionSystemClient(); - this.txContext = new TransactionContext(txServiceClient); + phoenixTransactionContext = new TephraTransactionContext(connection); } else { isExternalTxContext = true; - this.txContext = txContext; + phoenixTransactionContext = new TephraTransactionContext(txContext, connection, subTask); } } else { // this code path is only used while running child scans, we can't pass the txContext to child scans // as it is not thread safe, so we use the tx member variable - this.txAwares = Lists.newArrayList(); - this.txContext = null; + phoenixTransactionContext = new TephraTransactionContext(txContext, connection, subTask); } } public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { - this(maxSize, connection, null, null, sizeOffset); + this(maxSize, connection, true, connection.getMutationState().getPhoenixTransactionContext(), sizeOffset); this.mutations.put(table, mutations); this.numRows = mutations.size(); - this.tx = connection.getMutationState().getTransaction(); throwIfTooBig(); } @@ -209,6 +206,10 @@ public class MutationState implements SQLCloseable { return maxSize; } + public PhoenixTransactionContext getPhoenixTransactionContext() { + return phoenixTransactionContext; + } + /** * Commit a write fence when creating an index so that we can detect * when a data table transaction is started before the create index @@ -219,33 +220,16 @@ public class MutationState implements SQLCloseable { * @param dataTable the data table upon which an index is being added * @throws SQLException */ - public void commitDDLFence(PTable dataTable) throws SQLException { + public void commitDDLFence(PTable dataTable, Logger logger) throws SQLException { if (dataTable.isTransactional()) { - byte[] key = dataTable.getName().getBytes(); - boolean success = false; try { - FenceWait fenceWait = VisibilityFence.prepareWait(key, connection.getQueryServices().getTransactionSystemClient()); - fenceWait.await(10000, TimeUnit.MILLISECONDS); - success = true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); - } catch (TimeoutException | TransactionFailureException e) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE) - .setSchemaName(dataTable.getSchemaName().getString()) - .setTableName(dataTable.getTableName().getString()) - .build().buildException(); + phoenixTransactionContext.commitDDLFence(dataTable, logger); } finally { // The client expects a transaction to be in progress on the txContext while the // VisibilityFence.prepareWait() starts a new tx and finishes/aborts it. After it's // finished, we start a new one here. // TODO: seems like an autonomous tx capability in Tephra would be useful here. - try { - txContext.start(); - if (logger.isInfoEnabled() && success) logger.info("Added write fence at ~" + getTransaction().getReadPointer()); - } catch (TransactionFailureException e) { - throw TransactionUtil.getTransactionFailureException(e); - } + phoenixTransactionContext.begin(); } } } @@ -262,27 +246,12 @@ public class MutationState implements SQLCloseable { if (table.getType() == PTableType.INDEX || !table.isTransactional()) { return; } - byte[] logicalKey = table.getName().getBytes(); - TransactionAware logicalTxAware = VisibilityFence.create(logicalKey); - if (this.txContext == null) { - this.txAwares.add(logicalTxAware); - } else { - this.txContext.addTransactionAware(logicalTxAware); - } - byte[] physicalKey = table.getPhysicalName().getBytes(); - if (Bytes.compareTo(physicalKey, logicalKey) != 0) { - TransactionAware physicalTxAware = VisibilityFence.create(physicalKey); - if (this.txContext == null) { - this.txAwares.add(physicalTxAware); - } else { - this.txContext.addTransactionAware(physicalTxAware); - } - } + + phoenixTransactionContext.markDMLFence(table); } public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException { - Transaction currentTx = getTransaction(); - if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { + if (! phoenixTransactionContext.isTransactionRunning() || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { return false; } Set<TableRef> sources = plan.getSourceRefs(); @@ -322,40 +291,14 @@ public class MutationState implements SQLCloseable { break; } } - if (hasUncommittedData) { - try { - if (txContext == null) { - currentTx = tx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx); - } else { - txContext.checkpoint(); - currentTx = tx = txContext.getCurrentTransaction(); - } - // Since we've checkpointed, we can clear out uncommitted set, since a statement run afterwards - // should see all this data. - uncommittedPhysicalNames.clear(); - } catch (TransactionFailureException e) { - throw new SQLException(e); - } - } - // Since we're querying our own table while mutating it, we must exclude - // see our current mutations, otherwise we can get erroneous results (for DELETE) - // or get into an infinite loop (for UPSERT SELECT). - currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + + phoenixTransactionContext.checkpoint(hasUncommittedData); + return true; } return false; } - - private void addTransactionParticipant(TransactionAware txAware) throws SQLException { - if (txContext == null) { - txAwares.add(txAware); - assert(tx != null); - txAware.startTx(tx); - } else { - txContext.addTransactionAware(txAware); - } - } - + // Though MutationState is not thread safe in general, this method should be because it may // be called by TableResultIterator in a multi-threaded manner. Since we do not want to expose // the Transaction outside of MutationState, this seems reasonable, as the member variables @@ -372,68 +315,52 @@ public class MutationState implements SQLCloseable { } return htable; } - + public PhoenixConnection getConnection() { return connection; } - - // Kept private as the Transaction may change when check pointed. Keeping it private ensures - // no one holds on to a stale copy. - private Transaction getTransaction() { - return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null; - } - + public boolean isTransactionStarted() { - return getTransaction() != null; + return phoenixTransactionContext.isTransactionRunning(); } - + public long getInitialWritePointer() { - Transaction tx = getTransaction(); - return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getTransactionId(); // First write pointer - won't change with checkpointing + return phoenixTransactionContext.getTransactionId(); // First write pointer - won't change with checkpointing } - + // For testing public long getWritePointer() { - Transaction tx = getTransaction(); - return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer(); + return phoenixTransactionContext.getWritePointer(); } - + // For testing - public VisibilityLevel getVisibilityLevel() { - Transaction tx = getTransaction(); - return tx == null ? null : tx.getVisibilityLevel(); + public PhoenixVisibilityLevel getVisibilityLevel() { + return phoenixTransactionContext.getVisibilityLevel(); } - + public boolean startTransaction() throws SQLException { - if (txContext == null) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException(); - } - if (connection.getSCN() != null) { throw new SQLExceptionInfo.Builder( SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET) .build().buildException(); } - - try { - if (!isTransactionStarted()) { - // Clear any transactional state in case transaction was ended outside - // of Phoenix so we don't carry the old transaction state forward. We - // cannot call reset() here due to the case of having mutations and - // then transitioning from non transactional to transactional (which - // would end up clearing our uncommitted state). - resetTransactionalState(); - txContext.start(); - return true; - } - } catch (TransactionFailureException e) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED).setRootCause(e).build().buildException(); + + if (!isTransactionStarted()) { + // Clear any transactional state in case transaction was ended outside + // of Phoenix so we don't carry the old transaction state forward. We + // cannot call reset() here due to the case of having mutations and + // then transitioning from non transactional to transactional (which + // would end up clearing our uncommitted state). + resetTransactionalState(); + phoenixTransactionContext.begin(); + return true; } + return false; } public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) { - MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null); + MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), false, null); state.sizeOffset = 0; return state; } @@ -512,13 +439,9 @@ public class MutationState implements SQLCloseable { if (this == newMutationState) { // Doesn't make sense return; } - if (txContext != null) { - for (TransactionAware txAware : newMutationState.txAwares) { - txContext.addTransactionAware(txAware); - } - } else { - txAwares.addAll(newMutationState.txAwares); - } + + phoenixTransactionContext.join(getPhoenixTransactionContext()); + this.sizeOffset += newMutationState.sizeOffset; joinMutationState(newMutationState.mutations, this.mutations); if (!newMutationState.txMutations.isEmpty()) { @@ -535,7 +458,7 @@ public class MutationState implements SQLCloseable { } throwIfTooBig(); } - + private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) { RowKeySchema schema = table.getRowKeySchema(); @@ -1054,24 +977,15 @@ public class MutationState implements SQLCloseable { txTableRefs.add(origTableRef); addDMLFence(table); uncommittedPhysicalNames.add(table.getPhysicalName().getString()); - + // If we have indexes, wrap the HTable in a delegate HTable that // will attach the necessary index meta data in the event of a // rollback if (!table.getIndexes().isEmpty()) { hTable = new MetaDataAwareHTable(hTable, origTableRef); } - TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table.isImmutableRows()); - // Don't add immutable indexes (those are the only ones that would participate - // during a commit), as we don't need conflict detection for these. - if (tableInfo.isDataTable()) { - // Even for immutable, we need to do this so that an abort has the state - // necessary to generate the rows to delete. - addTransactionParticipant(txnAware); - } else { - txnAware.startTx(getTransaction()); - } - hTable = txnAware; + + hTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, table.isImmutableRows()); } long numMutations = mutationList.size(); @@ -1261,29 +1175,22 @@ public class MutationState implements SQLCloseable { this.mutations.clear(); resetTransactionalState(); } - + private void resetTransactionalState() { - tx = null; - txAwares.clear(); + phoenixTransactionContext.reset(); txMutations = Collections.emptyMap(); uncommittedPhysicalNames.clear(); uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; } - + public void rollback() throws SQLException { try { - if (txContext != null && isTransactionStarted()) { - try { - txContext.abort(); - } catch (TransactionFailureException e) { - throw TransactionUtil.getTransactionFailureException(e); - } - } + phoenixTransactionContext.abort(); } finally { resetState(); } } - + public void commit() throws SQLException { Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); int retryCount = 0; @@ -1299,38 +1206,32 @@ public class MutationState implements SQLCloseable { sqlE = e; } finally { try { - if (txContext != null && isTransactionStarted()) { - TransactionFailureException txFailure = null; - boolean finishSuccessful=false; - try { - if (sendSuccessful) { - txContext.finish(); - finishSuccessful = true; - } - } catch (TransactionFailureException e) { - if (logger.isInfoEnabled()) logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount); - retryCommit = (e instanceof TransactionConflictException && retryCount < MAX_COMMIT_RETRIES); - txFailure = e; - SQLException nextE = TransactionUtil.getTransactionFailureException(e); - if (sqlE == null) { - sqlE = nextE; - } else { - sqlE.setNextException(nextE); - } - } finally { - // If send fails or finish fails, abort the tx - if (!finishSuccessful) { - try { - txContext.abort(txFailure); - if (logger.isInfoEnabled()) logger.info("Abort successful"); - } catch (TransactionFailureException e) { - if (logger.isInfoEnabled()) logger.info("Abort failed with " + e); - SQLException nextE = TransactionUtil.getTransactionFailureException(e); - if (sqlE == null) { - sqlE = nextE; - } else { - sqlE.setNextException(nextE); - } + boolean finishSuccessful=false; + try { + if (sendSuccessful) { + phoenixTransactionContext.commit(); + finishSuccessful = true; + } + } catch (SQLException e) { + if (logger.isInfoEnabled()) logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount); + retryCommit = (e.getErrorCode() == SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode() && retryCount < MAX_COMMIT_RETRIES); + if (sqlE == null) { + sqlE = e; + } else { + sqlE.setNextException(e); + } + } finally { + // If send fails or finish fails, abort the tx + if (!finishSuccessful) { + try { + phoenixTransactionContext.abort(); + if (logger.isInfoEnabled()) logger.info("Abort successful"); + } catch (SQLException e) { + if (logger.isInfoEnabled()) logger.info("Abort failed with " + e); + if (sqlE == null) { + sqlE = e; + } else { + sqlE.setNextException(e); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index 937ac14..596cf73 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -4,6 +4,7 @@ import java.sql.SQLException; import java.util.concurrent.TimeoutException; import org.apache.phoenix.schema.PTable; +import org.slf4j.Logger; public class OmidTransactionContext implements PhoenixTransactionContext { @@ -32,8 +33,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext { } @Override - public void commitDDLFence(PTable dataTable) throws SQLException, - InterruptedException, TimeoutException { + public void commitDDLFence(PTable dataTable, Logger logger) throws SQLException { // TODO Auto-generated method stub } @@ -74,4 +74,15 @@ public class OmidTransactionContext implements PhoenixTransactionContext { return 0; } + @Override + public long getWritePointer() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public PhoenixVisibilityLevel getVisibilityLevel() { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java index 87b68f9..2d0d5b7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -1,6 +1,8 @@ package org.apache.phoenix.transaction; import org.apache.phoenix.schema.PTable; +import org.apache.tephra.Transaction.VisibilityLevel; +import org.slf4j.Logger; import java.sql.SQLException; import java.util.concurrent.TimeoutException; @@ -8,6 +10,17 @@ import java.util.concurrent.TimeoutException; public interface PhoenixTransactionContext { /** + * + * Visibility levels needed for checkpointing and + * + */ + public enum PhoenixVisibilityLevel { + SNAPSHOT, + SNAPSHOT_EXCLUDE_CURRENT, + SNAPSHOT_ALL + } + + /** * Starts a transaction * * @throws SQLException @@ -43,8 +56,8 @@ public interface PhoenixTransactionContext { * @throws InterruptedException * @throws TimeoutException */ - public void commitDDLFence(PTable dataTable) - throws SQLException, InterruptedException, TimeoutException; + public void commitDDLFence(PTable dataTable, Logger logger) + throws SQLException; /** * mark DML with table information for conflict detection of concurrent @@ -80,4 +93,14 @@ public interface PhoenixTransactionContext { * Returns transaction snapshot id */ long getReadPointer(); + + /** + * Returns transaction write pointer. After checkpoint the write pointer is different than the initial one + */ + long getWritePointer(); + + /** + * Returns visibility level + */ + PhoenixVisibilityLevel getVisibilityLevel(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java index 8fc5e0f..f8096d5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -24,6 +24,8 @@ import org.apache.tephra.visibility.VisibilityFence; import com.google.common.collect.Lists; +import org.slf4j.Logger; + public class TephraTransactionContext implements PhoenixTransactionContext { private final List<TransactionAware> txAwares; @@ -32,24 +34,26 @@ public class TephraTransactionContext implements PhoenixTransactionContext { private TransactionSystemClient txServiceClient; private TransactionFailureException e; - public TephraTransactionContext(PhoenixTransactionContext ctx, PhoenixConnection connection, boolean threadSafe) { + public TephraTransactionContext(PhoenixConnection connection) { + this.txServiceClient = connection.getQueryServices().getTransactionSystemClient(); + this.txAwares = Collections.emptyList(); + this.txContext = new TransactionContext(txServiceClient); + } + + public TephraTransactionContext(PhoenixTransactionContext ctx, PhoenixConnection connection, boolean subTask) { this.txServiceClient = connection.getQueryServices().getTransactionSystemClient(); assert(ctx instanceof TephraTransactionContext); TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx; - if (threadSafe) { + if (subTask) { this.tx = tephraTransactionContext.getTransaction(); this.txAwares = Lists.newArrayList(); this.txContext = null; } else { this.txAwares = Collections.emptyList(); - if (ctx == null) { - this.txContext = new TransactionContext(txServiceClient); - } else { - this.txContext = tephraTransactionContext.getContext(); - } + this.txContext = tephraTransactionContext.getContext(); } this.e = null; @@ -73,8 +77,12 @@ public class TephraTransactionContext implements PhoenixTransactionContext { @Override public void commit() throws SQLException { + + if (txContext == null || !isTransactionRunning()) { + return; + } + try { - assert(txContext != null); txContext.finish(); } catch (TransactionFailureException e) { this.e = e; @@ -93,6 +101,11 @@ public class TephraTransactionContext implements PhoenixTransactionContext { @Override public void abort() throws SQLException { + + if (txContext == null || !isTransactionRunning()) { + return; + } + try { if (e != null) { txContext.abort(e); @@ -125,6 +138,9 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } } + // Since we're querying our own table while mutating it, we must exclude + // see our current mutations, otherwise we can get erroneous results (for DELETE) + // or get into an infinite loop (for UPSERT SELECT). if (txContext == null) { tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); } @@ -135,12 +151,16 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } @Override - public void commitDDLFence(PTable dataTable) throws SQLException, - InterruptedException, TimeoutException { + public void commitDDLFence(PTable dataTable, Logger logger) throws SQLException { byte[] key = dataTable.getName().getBytes(); + try { FenceWait fenceWait = VisibilityFence.prepareWait(key, txServiceClient); fenceWait.await(10000, TimeUnit.MILLISECONDS); + + if (logger.isInfoEnabled()) { + logger.info("Added write fence at ~" + getTransaction().getReadPointer()); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); @@ -156,11 +176,13 @@ public class TephraTransactionContext implements PhoenixTransactionContext { public void markDMLFence(PTable table) { byte[] logicalKey = table.getName().getBytes(); TransactionAware logicalTxAware = VisibilityFence.create(logicalKey); + if (this.txContext == null) { this.txAwares.add(logicalTxAware); } else { this.txContext.addTransactionAware(logicalTxAware); } + byte[] physicalKey = table.getPhysicalName().getBytes(); if (Bytes.compareTo(physicalKey, logicalKey) != 0) { TransactionAware physicalTxAware = VisibilityFence.create(physicalKey); @@ -233,6 +255,48 @@ public class TephraTransactionContext implements PhoenixTransactionContext { return (-1); } + // For testing + @Override + public long getWritePointer() { + if (this.txContext != null) { + return txContext.getCurrentTransaction().getWritePointer(); + } + + if (tx != null) { + return tx.getWritePointer(); + } + + return HConstants.LATEST_TIMESTAMP; + } + + // For testing + @Override + public PhoenixVisibilityLevel getVisibilityLevel() { + VisibilityLevel visibilityLevel = null; + + if (this.txContext != null) { + visibilityLevel = txContext.getCurrentTransaction().getVisibilityLevel(); + } else if (tx != null) { + visibilityLevel = tx.getVisibilityLevel(); + } + + PhoenixVisibilityLevel phoenixVisibilityLevel; + switch(visibilityLevel) { + case SNAPSHOT: + phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT; + break; + case SNAPSHOT_EXCLUDE_CURRENT: + phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; + break; + case SNAPSHOT_ALL: + phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_ALL; + default: + phoenixVisibilityLevel = null; + } + + return phoenixVisibilityLevel; + } + /** * TephraTransactionContext specific functions */ @@ -254,32 +318,8 @@ public class TephraTransactionContext implements PhoenixTransactionContext { txContext.addTransactionAware(txAware); } else if (this.tx != null) { txAwares.add(txAware); + assert(tx != null); + txAware.startTx(tx); } } - - // For testing - public long getWritePointer() { - if (this.txContext != null) { - return txContext.getCurrentTransaction().getWritePointer(); - } - - if (tx != null) { - return tx.getWritePointer(); - } - - return HConstants.LATEST_TIMESTAMP; - } - - // For testing - public VisibilityLevel getVisibilityLevel() { - if (this.txContext != null) { - return txContext.getCurrentTransaction().getVisibilityLevel(); - } - - if (tx != null) { - return tx.getVisibilityLevel(); - } - - return null; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java index 50ea600..e33a280 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.tephra.TxConstants; import org.apache.tephra.hbase.TransactionAwareHTable; import com.google.protobuf.Descriptors.MethodDescriptor; @@ -37,12 +38,16 @@ public class TephraTransactionTable implements PhoenixTransactionalTable { private TephraTransactionContext tephraTransactionContext; public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) { + this(ctx, hTable, false); + } + + public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable, boolean isImmutableRows) { assert(ctx instanceof TephraTransactionContext); tephraTransactionContext = (TephraTransactionContext) ctx; - transactionAwareHTable = new TransactionAwareHTable(hTable); + transactionAwareHTable = new TransactionAwareHTable(hTable, isImmutableRows ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); tephraTransactionContext.addTransactionAware(transactionAwareHTable); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index 04882e0..4fbbe57 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -29,6 +29,9 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionalTable; +import org.apache.phoenix.transaction.TephraTransactionTable; import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionFailureException; import org.apache.tephra.TxConstants; @@ -50,23 +53,8 @@ public class TransactionUtil { return serverTimeStamp / TxConstants.MAX_TX_PER_MS; } - public static SQLException getTransactionFailureException(TransactionFailureException e) { - if (e instanceof TransactionConflictException) { - return new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION) - .setMessage(e.getMessage()) - .setRootCause(e) - .build().buildException(); - - } - return new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) - .setMessage(e.getMessage()) - .setRootCause(e) - .build().buildException(); - } - - public static TransactionAwareHTable getTransactionAwareHTable(HTableInterface htable, boolean isImmutableRows) { - // Conflict detection is not needed for tables with write-once/append-only data - return new TransactionAwareHTable(htable, isImmutableRows ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); + public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, boolean isImmutableRows) { + return new TephraTransactionTable(phoenixTransactionContext, htable, isImmutableRows); } // we resolve transactional tables at the txn read pointer
