Saving partial results

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fa69563e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fa69563e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fa69563e

Branch: refs/heads/omid
Commit: fa69563e51fbebdf14d5af610506dd56b8289ec4
Parents: d2c1653
Author: Ohad Shacham <[email protected]>
Authored: Mon Mar 13 12:22:51 2017 +0200
Committer: Ohad Shacham <[email protected]>
Committed: Mon Mar 13 12:22:51 2017 +0200

----------------------------------------------------------------------
 .../apache/phoenix/execute/MutationState.java   | 309 +++++++------------
 .../transaction/OmidTransactionContext.java     |  15 +-
 .../transaction/PhoenixTransactionContext.java  |  27 +-
 .../transaction/TephraTransactionContext.java   | 112 ++++---
 .../transaction/TephraTransactionTable.java     |   7 +-
 .../apache/phoenix/util/TransactionUtil.java    |  22 +-
 6 files changed, 230 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 4775d59..c480e30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -84,6 +84,10 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TephraTransactionContext;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
@@ -123,54 +127,53 @@ public class MutationState implements SQLCloseable {
     private static final TransactionCodec CODEC = new TransactionCodec();
     private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0];
     private static final int MAX_COMMIT_RETRIES = 3;
-    
+
     private final PhoenixConnection connection;
     private final long maxSize;
     private final long maxSizeBytes;
     private long batchCount = 0L;
     private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
mutations;
-    private final List<TransactionAware> txAwares;
-    private final TransactionContext txContext;
     private final Set<String> uncommittedPhysicalNames = 
Sets.newHashSetWithExpectedSize(10);
-    
-    private Transaction tx;
+
     private long sizeOffset;
     private int numRows = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
     private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations 
= Collections.emptyMap();
-    
+
+    final PhoenixTransactionContext phoenixTransactionContext;
+
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
 
     public MutationState(long maxSize, PhoenixConnection connection) {
-        this(maxSize,connection, null, null);
+        this(maxSize,connection, false, null);
     }
-    
-    public MutationState(long maxSize, PhoenixConnection connection, 
TransactionContext txContext) {
-        this(maxSize,connection, null, txContext);
+
+    public MutationState(long maxSize, PhoenixConnection connection, 
PhoenixTransactionContext txContext) {
+        this(maxSize,connection, false, txContext);
     }
-    
+
     public MutationState(MutationState mutationState) {
-        this(mutationState.maxSize, mutationState.connection, 
mutationState.getTransaction(), null);
+        this(mutationState.maxSize, mutationState.connection, true, 
mutationState.getPhoenixTransactionContext());
     }
-    
+
     public MutationState(long maxSize, PhoenixConnection connection, long 
sizeOffset) {
-        this(maxSize, connection, null, null, sizeOffset);
+        this(maxSize, connection, false, null, sizeOffset);
     }
-    
-    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx, TransactionContext txContext) {
-        this(maxSize,connection, tx, txContext, 0);
+
+    private MutationState(long maxSize, PhoenixConnection connection, boolean 
subTask, PhoenixTransactionContext txContext) {
+        this(maxSize,connection, subTask, txContext, 0);
     }
-    
-    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx, TransactionContext txContext, long sizeOffset) {
-        this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, 
txContext);
+
+    private MutationState(long maxSize, PhoenixConnection connection, boolean 
subTask, PhoenixTransactionContext txContext, long sizeOffset) {
+        this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, 
txContext);
         this.sizeOffset = sizeOffset;
     }
-    
+
     MutationState(long maxSize, PhoenixConnection connection,
             Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
-            Transaction tx, TransactionContext txContext) {
+            boolean subTask, PhoenixTransactionContext txContext) {
         this.maxSize = maxSize;
         this.connection = connection;
         this.maxSizeBytes = connection.getMutateBatchSizeBytes();
@@ -178,30 +181,24 @@ public class MutationState implements SQLCloseable {
         boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
         this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
                 : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
-        this.tx = tx;
-        if (tx == null) {
-            this.txAwares = Collections.emptyList();
+        if (subTask == false) {
             if (txContext == null) {
-                TransactionSystemClient txServiceClient = this.connection
-                        .getQueryServices().getTransactionSystemClient();
-                this.txContext = new TransactionContext(txServiceClient);
+                phoenixTransactionContext = new 
TephraTransactionContext(connection);
             } else {
                 isExternalTxContext = true;
-                this.txContext = txContext;
+                phoenixTransactionContext = new 
TephraTransactionContext(txContext, connection, subTask);
             }
         } else {
             // this code path is only used while running child scans, we can't 
pass the txContext to child scans
             // as it is not thread safe, so we use the tx member variable
-            this.txAwares = Lists.newArrayList();
-            this.txContext = null;
+            phoenixTransactionContext = new 
TephraTransactionContext(txContext, connection, subTask);
         }
     }
 
     public MutationState(TableRef table, 
Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
-        this(maxSize, connection, null, null, sizeOffset);
+        this(maxSize, connection, true, 
connection.getMutationState().getPhoenixTransactionContext(), sizeOffset);
         this.mutations.put(table, mutations);
         this.numRows = mutations.size();
-        this.tx = connection.getMutationState().getTransaction();
         throwIfTooBig();
     }
     
@@ -209,6 +206,10 @@ public class MutationState implements SQLCloseable {
         return maxSize;
     }
     
+    public PhoenixTransactionContext getPhoenixTransactionContext() {
+        return phoenixTransactionContext;
+    }
+    
     /**
      * Commit a write fence when creating an index so that we can detect
      * when a data table transaction is started before the create index
@@ -219,33 +220,16 @@ public class MutationState implements SQLCloseable {
      * @param dataTable the data table upon which an index is being added
      * @throws SQLException
      */
-    public void commitDDLFence(PTable dataTable) throws SQLException {
+    public void commitDDLFence(PTable dataTable, Logger logger) throws 
SQLException {
         if (dataTable.isTransactional()) {
-            byte[] key = dataTable.getName().getBytes();
-            boolean success = false;
             try {
-                FenceWait fenceWait = VisibilityFence.prepareWait(key, 
connection.getQueryServices().getTransactionSystemClient());
-                fenceWait.await(10000, TimeUnit.MILLISECONDS);
-                success = true;
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
-            } catch (TimeoutException | TransactionFailureException e) {
-                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
-                .setSchemaName(dataTable.getSchemaName().getString())
-                .setTableName(dataTable.getTableName().getString())
-                .build().buildException();
+                phoenixTransactionContext.commitDDLFence(dataTable, logger);
             } finally {
                 // The client expects a transaction to be in progress on the 
txContext while the
                 // VisibilityFence.prepareWait() starts a new tx and 
finishes/aborts it. After it's
                 // finished, we start a new one here.
                 // TODO: seems like an autonomous tx capability in Tephra 
would be useful here.
-                try {
-                    txContext.start();
-                    if (logger.isInfoEnabled() && success) logger.info("Added 
write fence at ~" + getTransaction().getReadPointer());
-                } catch (TransactionFailureException e) {
-                    throw TransactionUtil.getTransactionFailureException(e);
-                }
+                phoenixTransactionContext.begin();
             }
         }
     }
@@ -262,27 +246,12 @@ public class MutationState implements SQLCloseable {
         if (table.getType() == PTableType.INDEX || !table.isTransactional()) {
             return;
         }
-        byte[] logicalKey = table.getName().getBytes();
-        TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
-        if (this.txContext == null) {
-            this.txAwares.add(logicalTxAware);
-        } else {
-            this.txContext.addTransactionAware(logicalTxAware);
-        }
-        byte[] physicalKey = table.getPhysicalName().getBytes();
-        if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
-            TransactionAware physicalTxAware = 
VisibilityFence.create(physicalKey);
-            if (this.txContext == null) {
-                this.txAwares.add(physicalTxAware);
-            } else {
-                this.txContext.addTransactionAware(physicalTxAware);
-            }
-        }
+
+        phoenixTransactionContext.markDMLFence(table);
     }
     
     public boolean checkpointIfNeccessary(MutationPlan plan) throws 
SQLException {
-        Transaction currentTx = getTransaction();
-        if (getTransaction() == null || plan.getTargetRef() == null || 
plan.getTargetRef().getTable() == null || 
!plan.getTargetRef().getTable().isTransactional()) {
+        if (! phoenixTransactionContext.isTransactionRunning()  || 
plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || 
!plan.getTargetRef().getTable().isTransactional()) {
             return false;
         }
         Set<TableRef> sources = plan.getSourceRefs();
@@ -322,40 +291,14 @@ public class MutationState implements SQLCloseable {
                     break;
                 }
             }
-            if (hasUncommittedData) {
-                try {
-                    if (txContext == null) {
-                        currentTx = tx = 
connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
-                    }  else {
-                        txContext.checkpoint();
-                        currentTx = tx = txContext.getCurrentTransaction();
-                    }
-                    // Since we've checkpointed, we can clear out uncommitted 
set, since a statement run afterwards
-                    // should see all this data.
-                    uncommittedPhysicalNames.clear();
-                } catch (TransactionFailureException e) {
-                    throw new SQLException(e);
-                } 
-            }
-            // Since we're querying our own table while mutating it, we must 
exclude
-            // see our current mutations, otherwise we can get erroneous 
results (for DELETE)
-            // or get into an infinite loop (for UPSERT SELECT).
-            currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+
+            phoenixTransactionContext.checkpoint(hasUncommittedData);
+
             return true;
         }
         return false;
     }
-    
-    private void addTransactionParticipant(TransactionAware txAware) throws 
SQLException {
-        if (txContext == null) {
-            txAwares.add(txAware);
-            assert(tx != null);
-            txAware.startTx(tx);
-        } else {
-            txContext.addTransactionAware(txAware);
-        }
-    }
-    
+
     // Though MutationState is not thread safe in general, this method should 
be because it may
     // be called by TableResultIterator in a multi-threaded manner. Since we 
do not want to expose
     // the Transaction outside of MutationState, this seems reasonable, as the 
member variables
@@ -372,68 +315,52 @@ public class MutationState implements SQLCloseable {
         }
         return htable;
     }
-    
+
     public PhoenixConnection getConnection() {
         return connection;
     }
-    
-    // Kept private as the Transaction may change when check pointed. Keeping 
it private ensures
-    // no one holds on to a stale copy.
-    private Transaction getTransaction() {
-        return tx != null ? tx : txContext != null ? 
txContext.getCurrentTransaction() : null;
-    }
-    
+
     public boolean isTransactionStarted() {
-        return getTransaction() != null;
+        return phoenixTransactionContext.isTransactionRunning();
     }
-    
+
     public long getInitialWritePointer() {
-        Transaction tx = getTransaction();
-        return tx == null ? HConstants.LATEST_TIMESTAMP : 
tx.getTransactionId(); // First write pointer - won't change with checkpointing
+        return phoenixTransactionContext.getTransactionId(); // First write 
pointer - won't change with checkpointing
     }
-    
+
     // For testing
     public long getWritePointer() {
-        Transaction tx = getTransaction();
-        return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer();
+        return phoenixTransactionContext.getWritePointer();
     }
-    
+
     // For testing
-    public VisibilityLevel getVisibilityLevel() {
-        Transaction tx = getTransaction();
-        return tx == null ? null : tx.getVisibilityLevel();
+    public PhoenixVisibilityLevel getVisibilityLevel() {
+        return phoenixTransactionContext.getVisibilityLevel();
     }
-    
+
     public boolean startTransaction() throws SQLException {
-        if (txContext == null) {
-            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
-        }
-        
         if (connection.getSCN() != null) {
             throw new SQLExceptionInfo.Builder(
                     SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
                     .build().buildException();
         }
-        
-        try {
-            if (!isTransactionStarted()) {
-                // Clear any transactional state in case transaction was ended 
outside
-                // of Phoenix so we don't carry the old transaction state 
forward. We
-                // cannot call reset() here due to the case of having 
mutations and
-                // then transitioning from non transactional to transactional 
(which
-                // would end up clearing our uncommitted state).
-                resetTransactionalState();
-                txContext.start();
-                return true;
-            }
-        } catch (TransactionFailureException e) {
-            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED).setRootCause(e).build().buildException();
+
+        if (!isTransactionStarted()) {
+            // Clear any transactional state in case transaction was ended 
outside
+            // of Phoenix so we don't carry the old transaction state forward. 
We
+            // cannot call reset() here due to the case of having mutations and
+            // then transitioning from non transactional to transactional 
(which
+            // would end up clearing our uncommitted state).
+            resetTransactionalState();
+            phoenixTransactionContext.begin();
+            return true;
         }
+
         return false;
     }
 
     public static MutationState emptyMutationState(long maxSize, 
PhoenixConnection connection) {
-        MutationState state = new MutationState(maxSize, connection, 
Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), 
null, null);
+        MutationState state = new MutationState(maxSize, connection, 
Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), 
false, null);
         state.sizeOffset = 0;
         return state;
     }
@@ -512,13 +439,9 @@ public class MutationState implements SQLCloseable {
         if (this == newMutationState) { // Doesn't make sense
             return;
         }
-        if (txContext != null) {
-            for (TransactionAware txAware : newMutationState.txAwares) {
-                txContext.addTransactionAware(txAware);
-            }
-        } else {
-            txAwares.addAll(newMutationState.txAwares);
-        }
+
+        phoenixTransactionContext.join(getPhoenixTransactionContext());
+
         this.sizeOffset += newMutationState.sizeOffset;
         joinMutationState(newMutationState.mutations, this.mutations);
         if (!newMutationState.txMutations.isEmpty()) {
@@ -535,7 +458,7 @@ public class MutationState implements SQLCloseable {
         }
         throwIfTooBig();
     }
-    
+
 
     private static ImmutableBytesPtr 
getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable 
table) {
         RowKeySchema schema = table.getRowKeySchema();
@@ -1054,24 +977,15 @@ public class MutationState implements SQLCloseable {
                             txTableRefs.add(origTableRef);
                             addDMLFence(table);
                             
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
-                            
+
                             // If we have indexes, wrap the HTable in a 
delegate HTable that
                             // will attach the necessary index meta data in 
the event of a
                             // rollback
                             if (!table.getIndexes().isEmpty()) {
                                 hTable = new MetaDataAwareHTable(hTable, 
origTableRef);
                             }
-                            TransactionAwareHTable txnAware = 
TransactionUtil.getTransactionAwareHTable(hTable, table.isImmutableRows());
-                            // Don't add immutable indexes (those are the only 
ones that would participate
-                            // during a commit), as we don't need conflict 
detection for these.
-                            if (tableInfo.isDataTable()) {
-                                // Even for immutable, we need to do this so 
that an abort has the state
-                                // necessary to generate the rows to delete.
-                                addTransactionParticipant(txnAware);
-                            } else {
-                                txnAware.startTx(getTransaction());
-                            }
-                            hTable = txnAware;
+
+                            hTable = 
TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, 
table.isImmutableRows());                          
                         }
                         
                         long numMutations = mutationList.size();
@@ -1261,29 +1175,22 @@ public class MutationState implements SQLCloseable {
         this.mutations.clear();
         resetTransactionalState();
     }
-    
+
     private void resetTransactionalState() {
-        tx = null;
-        txAwares.clear();
+        phoenixTransactionContext.reset();
         txMutations = Collections.emptyMap();
         uncommittedPhysicalNames.clear();
         uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     }
-    
+
     public void rollback() throws SQLException {
         try {
-            if (txContext != null && isTransactionStarted()) {
-                try {
-                    txContext.abort();
-                } catch (TransactionFailureException e) {
-                    throw TransactionUtil.getTransactionFailureException(e);
-                }
-            }
+            phoenixTransactionContext.abort();
         } finally {
             resetState();
         }
     }
-    
+
     public void commit() throws SQLException {
         Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = 
Collections.emptyMap();
         int retryCount = 0;
@@ -1299,38 +1206,32 @@ public class MutationState implements SQLCloseable {
                 sqlE = e;
             } finally {
                 try {
-                    if (txContext != null && isTransactionStarted()) {
-                        TransactionFailureException txFailure = null;
-                        boolean finishSuccessful=false;
-                        try {
-                            if (sendSuccessful) {
-                                txContext.finish();
-                                finishSuccessful = true;
-                            }
-                        } catch (TransactionFailureException e) {
-                            if (logger.isInfoEnabled()) 
logger.info(e.getClass().getName() + " at timestamp " + 
getInitialWritePointer() + " with retry count of " + retryCount);
-                            retryCommit = (e instanceof 
TransactionConflictException && retryCount < MAX_COMMIT_RETRIES);
-                            txFailure = e;
-                            SQLException nextE = 
TransactionUtil.getTransactionFailureException(e);
-                            if (sqlE == null) {
-                                sqlE = nextE;
-                            } else {
-                                sqlE.setNextException(nextE);
-                            }
-                        } finally {
-                            // If send fails or finish fails, abort the tx
-                            if (!finishSuccessful) {
-                                try {
-                                    txContext.abort(txFailure);
-                                    if (logger.isInfoEnabled()) 
logger.info("Abort successful");
-                                } catch (TransactionFailureException e) {
-                                    if (logger.isInfoEnabled()) 
logger.info("Abort failed with " + e);
-                                    SQLException nextE = 
TransactionUtil.getTransactionFailureException(e);
-                                    if (sqlE == null) {
-                                        sqlE = nextE;
-                                    } else {
-                                        sqlE.setNextException(nextE);
-                                    }
+                    boolean finishSuccessful=false;
+                    try {
+                        if (sendSuccessful) {
+                            phoenixTransactionContext.commit();
+                            finishSuccessful = true;
+                        }
+                    } catch (SQLException e) {
+                        if (logger.isInfoEnabled()) 
logger.info(e.getClass().getName() + " at timestamp " + 
getInitialWritePointer() + " with retry count of " + retryCount);
+                        retryCommit = (e.getErrorCode() == 
SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode() && retryCount < 
MAX_COMMIT_RETRIES);
+                        if (sqlE == null) {
+                            sqlE = e;
+                        } else {
+                            sqlE.setNextException(e);
+                        }
+                    } finally {
+                        // If send fails or finish fails, abort the tx
+                        if (!finishSuccessful) {
+                            try {
+                                phoenixTransactionContext.abort();
+                                if (logger.isInfoEnabled()) logger.info("Abort 
successful");
+                            } catch (SQLException e) {
+                                if (logger.isInfoEnabled()) logger.info("Abort 
failed with " + e);
+                                if (sqlE == null) {
+                                    sqlE = e;
+                                } else {
+                                    sqlE.setNextException(e);
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index 937ac14..596cf73 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -4,6 +4,7 @@ import java.sql.SQLException;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.phoenix.schema.PTable;
+import org.slf4j.Logger;
 
 public class OmidTransactionContext implements PhoenixTransactionContext {
 
@@ -32,8 +33,7 @@ public class OmidTransactionContext implements 
PhoenixTransactionContext {
     }
 
     @Override
-    public void commitDDLFence(PTable dataTable) throws SQLException,
-            InterruptedException, TimeoutException {
+    public void commitDDLFence(PTable dataTable, Logger logger) throws 
SQLException {
         // TODO Auto-generated method stub
 
     }
@@ -74,4 +74,15 @@ public class OmidTransactionContext implements 
PhoenixTransactionContext {
         return 0;
     }
 
+    @Override
+    public long getWritePointer() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public PhoenixVisibilityLevel getVisibilityLevel() {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index 87b68f9..2d0d5b7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -1,6 +1,8 @@
 package org.apache.phoenix.transaction;
 
 import org.apache.phoenix.schema.PTable;
+import org.apache.tephra.Transaction.VisibilityLevel;
+import org.slf4j.Logger;
 
 import java.sql.SQLException;
 import java.util.concurrent.TimeoutException;
@@ -8,6 +10,17 @@ import java.util.concurrent.TimeoutException;
 public interface PhoenixTransactionContext {
 
     /**
+     * 
+     * Visibility levels needed for checkpointing and  
+     *
+     */
+    public enum PhoenixVisibilityLevel {
+        SNAPSHOT,
+        SNAPSHOT_EXCLUDE_CURRENT,
+        SNAPSHOT_ALL
+      }
+
+    /**
      * Starts a transaction
      *
      * @throws SQLException
@@ -43,8 +56,8 @@ public interface PhoenixTransactionContext {
      * @throws InterruptedException
      * @throws TimeoutException
      */
-    public void commitDDLFence(PTable dataTable)
-            throws SQLException, InterruptedException, TimeoutException;
+    public void commitDDLFence(PTable dataTable, Logger logger)
+            throws SQLException;
 
     /**
      * mark DML with table information for conflict detection of concurrent
@@ -80,4 +93,14 @@ public interface PhoenixTransactionContext {
      * Returns transaction snapshot id
      */
     long getReadPointer();
+
+    /**
+     * Returns transaction write pointer. After checkpoint the write pointer 
is different than the initial one  
+     */
+    long getWritePointer();
+
+    /**
+     * Returns visibility level 
+     */
+    PhoenixVisibilityLevel getVisibilityLevel();    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index 8fc5e0f..f8096d5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -24,6 +24,8 @@ import org.apache.tephra.visibility.VisibilityFence;
 
 import com.google.common.collect.Lists;
 
+import org.slf4j.Logger;
+
 public class TephraTransactionContext implements PhoenixTransactionContext {
 
     private final List<TransactionAware> txAwares;
@@ -32,24 +34,26 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
     private TransactionSystemClient txServiceClient;
     private TransactionFailureException e;
 
-    public TephraTransactionContext(PhoenixTransactionContext ctx, 
PhoenixConnection connection, boolean threadSafe) {
 
+    public TephraTransactionContext(PhoenixConnection connection) {
+        this.txServiceClient = 
connection.getQueryServices().getTransactionSystemClient();
+        this.txAwares = Collections.emptyList();
+        this.txContext = new TransactionContext(txServiceClient);
+    }
+
+    public TephraTransactionContext(PhoenixTransactionContext ctx, 
PhoenixConnection connection, boolean subTask) {
         this.txServiceClient = 
connection.getQueryServices().getTransactionSystemClient();
 
         assert(ctx instanceof TephraTransactionContext);
         TephraTransactionContext tephraTransactionContext = 
(TephraTransactionContext) ctx;
 
-        if (threadSafe) {
+        if (subTask) {
             this.tx = tephraTransactionContext.getTransaction();
             this.txAwares = Lists.newArrayList();
             this.txContext = null;
         } else {
             this.txAwares = Collections.emptyList();
-            if (ctx == null) {
-                this.txContext = new TransactionContext(txServiceClient);
-            } else {
-                this.txContext = tephraTransactionContext.getContext();
-            }
+            this.txContext = tephraTransactionContext.getContext();
         }
 
         this.e = null;
@@ -73,8 +77,12 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
 
     @Override
     public void commit() throws SQLException {
+        
+        if (txContext == null || !isTransactionRunning()) {
+            return;
+        }
+        
         try {
-            assert(txContext != null);
             txContext.finish();
         } catch (TransactionFailureException e) {
             this.e = e;
@@ -93,6 +101,11 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
 
     @Override
     public void abort() throws SQLException {
+        
+        if (txContext == null || !isTransactionRunning()) {
+            return;
+        }
+            
         try {
             if (e != null) {
                 txContext.abort(e);
@@ -125,6 +138,9 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
             }
         }
 
+        // Since we're querying our own table while mutating it, we must 
exclude
+        // see our current mutations, otherwise we can get erroneous results 
(for DELETE)
+        // or get into an infinite loop (for UPSERT SELECT).
         if (txContext == null) {
             tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
         }
@@ -135,12 +151,16 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
     }
 
     @Override
-    public void commitDDLFence(PTable dataTable) throws SQLException,
-            InterruptedException, TimeoutException {
+    public void commitDDLFence(PTable dataTable, Logger logger) throws 
SQLException {
         byte[] key = dataTable.getName().getBytes();
+
         try {
             FenceWait fenceWait = VisibilityFence.prepareWait(key, 
txServiceClient);
             fenceWait.await(10000, TimeUnit.MILLISECONDS);
+            
+            if (logger.isInfoEnabled()) {
+                logger.info("Added write fence at ~" + 
getTransaction().getReadPointer());
+            }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
@@ -156,11 +176,13 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
     public void markDMLFence(PTable table) {
         byte[] logicalKey = table.getName().getBytes();
         TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
+
         if (this.txContext == null) {
             this.txAwares.add(logicalTxAware);
         } else {
             this.txContext.addTransactionAware(logicalTxAware);
         }
+
         byte[] physicalKey = table.getPhysicalName().getBytes();
         if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
             TransactionAware physicalTxAware = 
VisibilityFence.create(physicalKey);
@@ -233,6 +255,48 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
         return (-1);
     }
 
+    // For testing
+    @Override
+    public long getWritePointer() {
+        if (this.txContext != null) {
+            return txContext.getCurrentTransaction().getWritePointer();
+        }
+
+        if (tx != null) {
+            return tx.getWritePointer();
+        }
+
+        return HConstants.LATEST_TIMESTAMP;
+    }
+
+    // For testing
+    @Override
+    public PhoenixVisibilityLevel getVisibilityLevel() {
+        VisibilityLevel visibilityLevel = null;
+
+        if (this.txContext != null) {
+            visibilityLevel = 
txContext.getCurrentTransaction().getVisibilityLevel();
+        } else if (tx != null) {
+            visibilityLevel = tx.getVisibilityLevel();
+        }
+
+        PhoenixVisibilityLevel phoenixVisibilityLevel;
+        switch(visibilityLevel) {
+        case SNAPSHOT:
+            phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT;
+            break;
+        case SNAPSHOT_EXCLUDE_CURRENT:
+            phoenixVisibilityLevel = 
PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+            break;
+        case SNAPSHOT_ALL:
+            phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_ALL;
+        default:
+            phoenixVisibilityLevel = null;
+        }
+
+        return phoenixVisibilityLevel;
+    }
+
    /**
     * TephraTransactionContext specific functions
     */
@@ -254,32 +318,8 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
             txContext.addTransactionAware(txAware);
         } else if (this.tx != null) {
             txAwares.add(txAware);
+            assert(tx != null);
+            txAware.startTx(tx);
         }
     }
-
-    // For testing
-    public long getWritePointer() {
-        if (this.txContext != null) {
-            return txContext.getCurrentTransaction().getWritePointer();
-        }
-
-        if (tx != null) {
-            return tx.getWritePointer();
-        }
-
-        return HConstants.LATEST_TIMESTAMP;
-    }
-
-    // For testing
-    public VisibilityLevel getVisibilityLevel() {
-        if (this.txContext != null) {
-            return txContext.getCurrentTransaction().getVisibilityLevel();
-        }
-
-        if (tx != null) {
-            return tx.getVisibilityLevel();
-        }
-
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
index 50ea600..e33a280 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.tephra.TxConstants;
 import org.apache.tephra.hbase.TransactionAwareHTable;
 
 import com.google.protobuf.Descriptors.MethodDescriptor;
@@ -37,12 +38,16 @@ public class TephraTransactionTable implements 
PhoenixTransactionalTable {
     private TephraTransactionContext tephraTransactionContext;
 
     public TephraTransactionTable(PhoenixTransactionContext ctx, 
HTableInterface hTable) {
+        this(ctx, hTable, false);
+    }
+
+    public TephraTransactionTable(PhoenixTransactionContext ctx, 
HTableInterface hTable, boolean isImmutableRows) {
 
         assert(ctx instanceof TephraTransactionContext);
 
         tephraTransactionContext = (TephraTransactionContext) ctx;
 
-        transactionAwareHTable = new TransactionAwareHTable(hTable);
+        transactionAwareHTable = new TransactionAwareHTable(hTable, 
isImmutableRows ? TxConstants.ConflictDetection.NONE : 
TxConstants.ConflictDetection.ROW);
 
         tephraTransactionContext.addTransactionAware(transactionAwareHTable);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa69563e/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 04882e0..4fbbe57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -29,6 +29,9 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TephraTransactionTable;
 import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionFailureException;
 import org.apache.tephra.TxConstants;
@@ -50,23 +53,8 @@ public class TransactionUtil {
         return serverTimeStamp / TxConstants.MAX_TX_PER_MS;
     }
     
-    public static SQLException 
getTransactionFailureException(TransactionFailureException e) {
-        if (e instanceof TransactionConflictException) { 
-            return new 
SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
-                .setMessage(e.getMessage())
-                .setRootCause(e)
-                .build().buildException();
-
-        }
-        return new 
SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
-            .setMessage(e.getMessage())
-            .setRootCause(e)
-            .build().buildException();
-    }
-    
-    public static TransactionAwareHTable 
getTransactionAwareHTable(HTableInterface htable, boolean isImmutableRows) {
-       // Conflict detection is not needed for tables with 
write-once/append-only data
-       return new TransactionAwareHTable(htable, isImmutableRows ? 
TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
+    public static PhoenixTransactionalTable 
getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, 
HTableInterface htable, boolean isImmutableRows) {
+        return new TephraTransactionTable(phoenixTransactionContext, htable, 
isImmutableRows);
     }
     
     // we resolve transactional tables at the txn read pointer

Reply via email to