Repository: phoenix
Updated Branches:
  refs/heads/txn acacaf340 -> d81e660e3


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
index f94ce39..8a4f376 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
@@ -41,33 +41,23 @@ public class TxPointInTimeQueryIT extends 
BaseClientManagedTimeIT {
        public void initTable() throws Exception {
                ts = nextTimestamp();
        }
-       
+
        @Test
        public void testQueryWithSCN() throws Exception {
                Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
                props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
-               Connection conn = DriverManager.getConnection(getUrl(), props);
-               try {
-                       conn.createStatement()
-                                       .execute(
-                                                       "CREATE TABLE t (k 
VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true");
-
-                       props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts + 10));
-                       conn = DriverManager.getConnection(getUrl(), props);
-
-                       String selectQuery = "SELECT k FROM t";
+               try (Connection conn = DriverManager.getConnection(getUrl(), 
props);) {
                        try {
-                               
conn.createStatement().executeQuery(selectQuery);
+                               conn.createStatement()
+                                               .execute(
+                                                               "CREATE TABLE t 
(k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true");
                                fail();
                        } catch (SQLException e) {
                                assertEquals("Unexpected Exception",
-                                               
SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET.getErrorCode(),
-                                               e.getErrorCode());
+                                               
SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET
+                                                               
.getErrorCode(), e.getErrorCode());
                        }
-
-               } finally {
-                       conn.close();
                }
        }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index db98e25..855915d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -347,7 +347,7 @@ public class FromCompiler {
             PTable theTable = null;
             if (updateCacheImmediately || connection.getAutoCommit()) {
                 MetaDataMutationResult result = client.updateCache(schemaName, 
tableName);
-                timeStamp = TransactionUtil.getTableTimestamp(connection, 
result);
+                timeStamp = TransactionUtil.getResolvedTimestamp(connection, 
result);
                 theTable = result.getTable();
                 if (theTable == null) {
                     throw new TableNotFoundException(schemaName, tableName, 
timeStamp);
@@ -367,7 +367,7 @@ public class FromCompiler {
                 if (theTable == null) {
                     MetaDataMutationResult result = 
client.updateCache(schemaName, tableName);
                     if (result.wasUpdated()) {
-                       timeStamp = 
TransactionUtil.getTableTimestamp(connection, result);
+                       timeStamp = 
TransactionUtil.getResolvedTimestamp(connection, result);
                         theTable = result.getTable();
                     }
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 5d24f7e..ee724fc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -774,7 +774,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
                 // without making an additional query
                 PTable table =
                         loadTable(env, key, cacheKey, clientTimeStamp, 
HConstants.LATEST_TIMESTAMP);
-                if (table != null) {
+                if (table != null && !isTableDeleted(table)) {
                     if (table.getTimeStamp() < clientTimeStamp) {
                         // If the table is older than the client time stamp 
and it's deleted,
                         // continue
@@ -803,7 +803,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
                 // on the system table. This is an issue because of the way we 
manage batch mutation
                 // in the
                 // Indexer.
-                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> 
emptySet());
+                               region.mutateRowsWithLocks(tableMetadata, 
Collections.<byte[]> emptySet());
 
                 // Invalidate the cache - the next getTable call will add it
                 // TODO: consider loading the table that was just created 
here, patching up the parent table, and updating the cache

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index d89e19a..db50f83 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -250,7 +250,7 @@ public enum SQLExceptionCode {
     DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1056, "43A13", "Default column 
family not allowed on VIEW or shared INDEX"),
     ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may 
be declared as transactional"),
     MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL(1071, "44A02", "An existing 
HBase table may not be mapped to as a transactional table"),
-       STORE_NULLS_MUST_BE_FALSE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls 
must be true when a table is transactional"),
+       STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls 
must be true when a table is transactional"),
     CANNOT_START_TRANSACTION_WITH_SCN_SET(1073, "44A04", "Cannot start a 
transaction on a connection with SCN set"),
 
     /** Sequence related */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 43fe28b..79da59f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import co.cask.tephra.Transaction;
@@ -196,13 +197,18 @@ public class MutationState implements SQLCloseable {
             }
             if (hasUncommittedData) {
                 try {
-                    tx = currentTx = 
connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
+                       if (txContext == null) {
+                               tx = currentTx = 
connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
+                       }  else {
+                               txContext.checkpoint();
+                               tx = currentTx = 
txContext.getCurrentTransaction();
+                       }
                     // Since we've checkpointed, we can clear out uncommitted 
set, since a statement run afterwards
                     // should see all this data.
                     uncommittedPhysicalNames.clear();
-                } catch (TransactionNotInProgressException e) {
+                } catch (TransactionFailureException | 
TransactionNotInProgressException e) {
                     throw new SQLException(e);
-                }
+                               } 
             }
             // Since we're querying our own table while mutating it, we must 
exclude
             // see our current mutations, otherwise we can get erroneous 
results (for DELETE)
@@ -367,37 +373,14 @@ public class MutationState implements SQLCloseable {
         throwIfTooBig();
     }
     
-    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final 
TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, 
long timestamp, boolean includeMutableIndexes) {
+    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final 
TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, 
final long timestamp, boolean includeMutableIndexes, final boolean sendAll) {
         final Iterator<PTable> indexes = // Only maintain tables with 
immutable rows through this client-side mechanism
                 (tableRef.getTable().isImmutableRows() || 
includeMutableIndexes) ? 
                         
IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator())
 : 
                         Iterators.<PTable>emptyIterator();
-        final List<Mutation> mutations = 
Lists.newArrayListWithExpectedSize(values.size());
+        final List<Mutation> mutationList = 
Lists.newArrayListWithExpectedSize(values.size());
         final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? 
Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
-        Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = 
values.entrySet().iterator();
-        final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        while (iterator.hasNext()) {
-            Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = 
iterator.next();
-            ImmutableBytesPtr key = rowEntry.getKey();
-            PRow row = 
tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key);
-            List<Mutation> rowMutations, rowMutationsPertainingToIndex;
-            if (rowEntry.getValue() == PRow.DELETE_MARKER) { // means delete
-                row.delete();
-                rowMutations = row.toRowMutations();
-                // Row deletes for index tables are processed by running a 
re-written query
-                // against the index table (as this allows for flexibility in 
being able to
-                // delete rows).
-                rowMutationsPertainingToIndex = Collections.emptyList();
-            } else {
-                for (Map.Entry<PColumn,byte[]> valueEntry : 
rowEntry.getValue().entrySet()) {
-                    row.setValue(valueEntry.getKey(), valueEntry.getValue());
-                }
-                rowMutations = row.toRowMutations();
-                rowMutationsPertainingToIndex = rowMutations;
-            }
-            mutations.addAll(rowMutations);
-            if (mutationsPertainingToIndex != null) 
mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex);
-        }
+        generateMutations(tableRef, timestamp, values, mutationList, 
mutationsPertainingToIndex);
         return new Iterator<Pair<byte[],List<Mutation>>>() {
             boolean isFirst = true;
 
@@ -410,14 +393,24 @@ public class MutationState implements SQLCloseable {
             public Pair<byte[], List<Mutation>> next() {
                 if (isFirst) {
                     isFirst = false;
-                    return new 
Pair<byte[],List<Mutation>>(tableRef.getTable().getPhysicalName().getBytes(),mutations);
+                    return new 
Pair<byte[],List<Mutation>>(tableRef.getTable().getPhysicalName().getBytes(),mutationList);
                 }
                 PTable index = indexes.next();
                 List<Mutation> indexMutations;
                 try {
                     indexMutations =
                             IndexUtil.generateIndexData(tableRef.getTable(), 
index, mutationsPertainingToIndex,
-                                    ptr, connection.getKeyValueBuilder(), 
connection);
+                                    connection.getKeyValueBuilder(), 
connection);
+                    // we may also have to include delete mutations for 
immutable tables if we are not processing all the tables in the mutations map
+                    if (!sendAll) {
+                           TableRef key = new TableRef(index);
+                                               Map<ImmutableBytesPtr, 
Map<PColumn, byte[]>> rowToColumnMap = mutations.remove(key);
+                           if (rowToColumnMap!=null) {
+                                   final List<Mutation> deleteMutations = 
Lists.newArrayList();
+                                   generateMutations(tableRef, timestamp, 
rowToColumnMap, deleteMutations, null);
+                                   indexMutations.addAll(deleteMutations);
+                           }
+                    }
                 } catch (SQLException e) {
                     throw new IllegalDataException(e);
                 }
@@ -431,6 +424,35 @@ public class MutationState implements SQLCloseable {
             
         };
     }
+
+       private void generateMutations(final TableRef tableRef, long timestamp,
+                       final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> 
values,
+                       final List<Mutation> mutationList,
+                       final List<Mutation> mutationsPertainingToIndex) {
+               Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
iterator = values.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = 
iterator.next();
+            ImmutableBytesPtr key = rowEntry.getKey();
+                       PRow row = 
tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key);
+                       List<Mutation> rowMutations, 
rowMutationsPertainingToIndex;
+                       if (rowEntry.getValue() == PRow.DELETE_MARKER) { // 
means delete
+                           row.delete();
+                           rowMutations = row.toRowMutations();
+                           // Row deletes for index tables are processed by 
running a re-written query
+                           // against the index table (as this allows for 
flexibility in being able to
+                           // delete rows).
+                           rowMutationsPertainingToIndex = 
Collections.emptyList();
+                       } else {
+                           for (Map.Entry<PColumn,byte[]> valueEntry : 
rowEntry.getValue().entrySet()) {
+                               row.setValue(valueEntry.getKey(), 
valueEntry.getValue());
+                           }
+                           rowMutations = row.toRowMutations();
+                           rowMutationsPertainingToIndex = rowMutations;
+                       }
+                       mutationList.addAll(rowMutations);
+                       if (mutationsPertainingToIndex != null) 
mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex);
+        }
+       }
     
     /**
      * Get the unsorted list of HBase mutations for the tables with 
uncommitted data.
@@ -453,7 +475,7 @@ public class MutationState implements SQLCloseable {
             private Iterator<Pair<byte[],List<Mutation>>> innerIterator = 
init();
                     
             private Iterator<Pair<byte[],List<Mutation>>> init() {
-                return addRowMutations(current.getKey(), current.getValue(), 
timestamp, includeMutableIndexes);
+                return addRowMutations(current.getKey(), current.getValue(), 
timestamp, includeMutableIndexes, true);
             }
             
             @Override
@@ -651,7 +673,7 @@ public class MutationState implements SQLCloseable {
             boolean isDataTable = true;
             // Validate as we go if transactional since we can undo if a 
problem occurs (which is unlikely)
             long serverTimestamp = serverTimeStamps == null ? 
validate(tableRef, valuesMap) : serverTimeStamps[i++];
-            Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false);
+            Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
             while (mutationsIterator.hasNext()) {
                 Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
                 byte[] htableName = pair.getFirst();
@@ -855,13 +877,17 @@ public class MutationState implements SQLCloseable {
     }
     
     public void commit() throws SQLException {
+       boolean sendMutationsFailed=false;
         try {
             send();
+        } catch (Throwable t) {
+               sendMutationsFailed=true;
+               throw t;
         } finally {
             txAwares.clear();
             if (txContext != null) {
                 try {
-                    if (txStarted) {
+                    if (txStarted && !sendMutationsFailed) {
                         txContext.finish();
                     }
                 } catch (TransactionFailureException e) {
@@ -872,8 +898,10 @@ public class MutationState implements SQLCloseable {
                         throw TransactionUtil.getSQLException(e);
                     }
                 } finally {
-                       reset();
-                }
+                       if (!sendMutationsFailed) {
+                               reset();
+                       }
+                  }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index b0f87cb..bd0461d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -34,6 +34,7 @@ import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Delete;
@@ -83,6 +84,8 @@ import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
+import co.cask.tephra.TxConstants;
+
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -822,12 +825,15 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         int nDeleteCF = 0;
         int nDeleteVersionCF = 0;
         for (Cell kv : pendingUpdates) {
-               if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
-                   nDeleteCF++;
-               }
-               else if (kv.getTypeByte() == 
KeyValue.Type.DeleteFamilyVersion.getCode()) {
+               if (kv.getTypeByte() == 
KeyValue.Type.DeleteFamilyVersion.getCode()) {
                 nDeleteVersionCF++;
             }
+               else if (kv.getTypeByte() == 
KeyValue.Type.DeleteFamily.getCode()
+                               // Since we don't include the index rows in the 
change set for txn tables, we need to detect row deletes that have transformed 
by TransactionProcessor
+                               // TODO see if implement 
PhoenixTransactionalIndexer.preDelete will work instead of the following check
+                               || (CellUtil.matchingQualifier(kv, 
TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, 
HConstants.EMPTY_BYTE_ARRAY))) {
+                   nDeleteCF++;
+               }
         }
         // This is what a delete looks like on the server side for mutable 
indexing...
         // Should all be one or the other for DeleteFamily versus 
DeleteFamilyVersion, but just in case not
@@ -850,7 +856,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                Cell newValue = newState.get(ref);
                if (newValue != null) { // Indexed column has potentially 
changed
                    ImmutableBytesWritable oldValue = 
oldState.getLatestValue(ref);
-                       boolean newValueSetAsNull = (newValue.getTypeByte() == 
Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode());
+                       boolean newValueSetAsNull = (newValue.getTypeByte() == 
Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode() 
|| CellUtil.matchingValue(newValue, HConstants.EMPTY_BYTE_ARRAY));
                        //If the new column value has to be set as null and the 
older value is null too,
                        //then just skip to the next indexed column.
                        if (newValueSetAsNull && oldValue == null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 14f8a1f..2719119 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -86,13 +86,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         ptr.set(state.getCurrentRowKey());
         List<IndexUpdate> indexUpdates = Lists.newArrayList();
         for (IndexMaintainer maintainer : indexMaintainers) {
-            // Check both immutable and local, as for transactional tables, we 
use an index maintainer
+            // For transactional tables, we use an index maintainer
             // to aid in rollback if there's a KeyValue column in the index. 
The alternative would be
             // to hold on to all uncommitted index row keys (even ones already 
sent to HBase) on the
             // client side.
-            if (maintainer.isImmutableRows() && maintainer.isLocalIndex()) {
-                continue;
-            }
             Pair<ValueGetter, IndexUpdate> statePair = 
state.getIndexUpdateState(maintainer.getAllColumns());
             ValueGetter valueGetter = statePair.getFirst();
             IndexUpdate indexUpdate = statePair.getSecond();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 3d5da43..9f03747 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -17,6 +17,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import co.cask.tephra.Transaction;
@@ -28,11 +29,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -137,12 +141,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
 
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
         PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
-        if (m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) == null) {
-               // Unless we're aborting the transaction, we do not want to see 
our own transaction writes,
-               // since index maintenance requires seeing the previously 
committed data in order to function
-               // properly.
-               
indexMetaData.getTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
-        }
+        byte[] txRollbackAttribute = 
m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY);
         Collection<Pair<Mutation, byte[]>> indexUpdates = null;
         // get the current span, or just use a null-span to avoid a bunch of 
if statements
         try (TraceScope scope = Trace.startSpan("Starting to build index 
updates")) {
@@ -152,7 +151,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             }
 
             // get the index updates for all elements in this batch
-            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, 
getMutationIterator(miniBatchOp));
+            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, 
getMutationIterator(miniBatchOp), txRollbackAttribute);
 
             current.addTimelineAnnotation("Built index updates, doing 
preStep");
             TracingUtils.addAnnotation(current, "index update count", 
indexUpdates.size());
@@ -168,10 +167,10 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         }
     }
 
-    private Collection<Pair<Mutation, byte[]>> 
getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData 
indexMetaData, Iterator<Mutation> mutationIterator) throws IOException {
-        ResultScanner scanner = null;
+    private Collection<Pair<Mutation, byte[]>> 
getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData 
indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) 
throws IOException {
+        ResultScanner currentScanner = null;
+        ResultScanner previousScanner = null;
         TransactionAwareHTable txTable = null;
-        
         // Collect up all mutations in batch
         Map<ImmutableBytesPtr, MultiMutation> mutations =
                 new HashMap<ImmutableBytesPtr, MultiMutation>();
@@ -196,13 +195,11 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         List<IndexMaintainer> indexMaintainers = 
indexMetaData.getIndexMaintainers();
         Set<ColumnReference> mutableColumns = 
Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
         for (IndexMaintainer indexMaintainer : indexMaintainers) {
-            // Check both immutable and local, as for transactional tables, we 
use an index maintainer
+            // For transactional tables, we use an index maintainer
             // to aid in rollback if there's a KeyValue column in the index. 
The alternative would be
             // to hold on to all uncommitted index row keys (even ones already 
sent to HBase) on the
             // client side.
-            if (!indexMaintainer.isImmutableRows() || 
!indexMaintainer.isLocalIndex()) {
                 mutableColumns.addAll(indexMaintainer.getAllColumns());
-            }
         }
 
         Collection<Pair<Mutation, byte[]>> indexUpdates = new 
ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * 
indexMaintainers.size());
@@ -226,39 +223,23 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                 HTableInterface htable = env.getTable(tableName);
                 txTable = new TransactionAwareHTable(htable);
                 txTable.startTx(tx);
-                scanner = txTable.getScanner(scan);
-            }
-            ColumnReference emptyColRef = new 
ColumnReference(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
QueryConstants.EMPTY_COLUMN_BYTES);
-            if (scanner != null) {
-                Result result;
-                while ((result = scanner.next()) != null) {
-                    Mutation m = mutations.remove(new 
ImmutableBytesPtr(result.getRow()));
-                    byte[] attribValue = 
m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY);
-                    TxTableState state = new TxTableState(env, mutableColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result);
-                    Iterable<IndexUpdate> deletes = 
codec.getIndexDeletes(state, indexMetaData);
-                    for (IndexUpdate delete : deletes) {
-                        if (delete.isValid()) {
-                            
delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
attribValue);
-                            indexUpdates.add(new Pair<Mutation, 
byte[]>(delete.getUpdate(),delete.getTableName()));
-                        }
-                    }
-                    state.applyMutation();
-                    Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, 
indexMetaData);
-                    for (IndexUpdate put : puts) {
-                        if (put.isValid()) {
-                            indexUpdates.add(new Pair<Mutation, 
byte[]>(put.getUpdate(),put.getTableName()));
-                        }
-                    }
+                currentScanner = txTable.getScanner(scan);
+                if (txRollbackAttribute!=null) {
+                       
tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+                       previousScanner = txTable.getScanner(scan);
                 }
             }
+            // In case of rollback we have to do two scans, one with 
VisibilityLevel.SNAPSHOT to see the current state of the row 
+            // and another with VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT to 
see the previous state of the row
+            // so that we can rollback a previous delete + put 
+            processScanner(env, indexMetaData, txRollbackAttribute, 
previousScanner, mutations, tx, mutableColumns, indexUpdates, false);
+            processScanner(env, indexMetaData, txRollbackAttribute, 
currentScanner, mutations, tx, mutableColumns, indexUpdates, true);
             for (Mutation m : mutations.values()) {
-                TxTableState state = new TxTableState(env, mutableColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m);
-                state.applyMutation();
-                Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, 
indexMetaData);
-                for (IndexUpdate put : puts) {
-                    if (put.isValid()) {
-                        indexUpdates.add(new Pair<Mutation, 
byte[]>(put.getUpdate(),put.getTableName()));
-                    }
+               long timestamp = getTimestamp(txRollbackAttribute, 
tx.getWritePointer(), m);
+               TxTableState state = new TxTableState(env, mutableColumns, 
indexMetaData.getAttributes(), timestamp, m);
+               // if we did not generate valid put, we might have to generate 
a delete
+                if (!generatePuts(indexMetaData, indexUpdates, state)) {
+                       generateDeletes(indexMetaData, indexUpdates, 
txRollbackAttribute, state);
                 }
             }
         } finally {
@@ -268,6 +249,68 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         return indexUpdates;
     }
 
+       private void processScanner(RegionCoprocessorEnvironment env,
+                       PhoenixIndexMetaData indexMetaData, byte[] 
txRollbackAttribute,
+                       ResultScanner scanner,
+                       Map<ImmutableBytesPtr, MultiMutation> mutations, 
Transaction tx,
+                       Set<ColumnReference> mutableColumns,
+                       Collection<Pair<Mutation, byte[]>> indexUpdates, 
boolean removeMutation) throws IOException {
+               if (scanner != null) {
+                   Result result;
+                   ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
+                   while ((result = scanner.next()) != null) {
+                       Mutation m = removeMutation ? mutations.remove(new 
ImmutableBytesPtr(result.getRow())) : mutations.get(new 
ImmutableBytesPtr(result.getRow()));
+                       long timestamp = getTimestamp(txRollbackAttribute, 
tx.getWritePointer(), m);
+                       TxTableState state = new TxTableState(env, 
mutableColumns, indexMetaData.getAttributes(), timestamp, m, emptyColRef, 
result);
+                       generateDeletes(indexMetaData, indexUpdates, 
txRollbackAttribute, state);
+                       generatePuts(indexMetaData, indexUpdates, state);
+                   }
+               }
+       }
+
+       private long getTimestamp(byte[] txRollbackAttribute, long 
txnWritePointer, Mutation m) {
+               if (txRollbackAttribute==null) {
+                       return txnWritePointer;
+               }
+               // if this is a rollback generate mutations with the same 
timestamp as the data row mutation as the timestamp might be 
+        // different from the current txn write pointer because of check points
+               long mutationTimestamp = txnWritePointer;
+               for (Entry<byte[], List<Cell>> entry : 
m.getFamilyCellMap().entrySet()) {
+                       mutationTimestamp = 
entry.getValue().get(0).getTimestamp();
+                       break;
+               }
+               return mutationTimestamp;
+       }
+
+       private void generateDeletes(PhoenixIndexMetaData indexMetaData,
+                       Collection<Pair<Mutation, byte[]>> indexUpdates,
+                       byte[] attribValue, TxTableState state) throws 
IOException {
+               Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, 
indexMetaData);
+               for (IndexUpdate delete : deletes) {
+                   if (delete.isValid()) {
+                       
delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
attribValue);
+                       indexUpdates.add(new Pair<Mutation, 
byte[]>(delete.getUpdate(),delete.getTableName()));
+                   }
+               }
+       }
+
+       boolean generatePuts(
+                       PhoenixIndexMetaData indexMetaData,
+                       Collection<Pair<Mutation, byte[]>> indexUpdates,
+                       TxTableState state)
+                       throws IOException {
+               state.applyMutation();
+               Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, 
indexMetaData);
+               boolean validPut = false;
+               for (IndexUpdate put : puts) {
+                   if (put.isValid()) {
+                       indexUpdates.add(new Pair<Mutation, 
byte[]>(put.getUpdate(),put.getTableName()));
+                       validPut = true;
+                   }
+               }
+               return validPut;
+       }
+
 
     private static class TxTableState implements TableState {
         private final Mutation mutation;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index a073b84..35a8663 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -215,7 +215,8 @@ public class QueryOptimizer {
     
     private static QueryPlan addPlan(PhoenixStatement statement, 
SelectStatement select, PTable index, List<? extends PDatum> targetColumns, 
ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan, boolean 
isHinted) throws SQLException {
         int nColumns = dataPlan.getProjector().getColumnCount();
-        String alias = '"' + dataPlan.getTableRef().getTableAlias() + '"'; // 
double quote in case it's case sensitive
+        String tableAlias = dataPlan.getTableRef().getTableAlias();
+               String alias = tableAlias==null ? null : '"' + tableAlias + 
'"'; // double quote in case it's case sensitive
         String schemaName = index.getParentSchemaName().getString();
         schemaName = schemaName.length() == 0 ? null :  '"' + schemaName + '"';
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 5667029..b2982e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1228,7 +1228,7 @@ public class MetaDataClient {
             Long timestamp = null;
             if (parent != null) {
                 transactional = parent.isTransactional();
-                timestamp = TransactionUtil.getTableTimestamp(connection, 
transactional, null);
+                timestamp = TransactionUtil.getTableTimestamp(connection, 
transactional);
                 storeNulls = parent.getStoreNulls();
                 if (tableType == PTableType.INDEX) {
                     // Index on view
@@ -1376,12 +1376,15 @@ public class MetaDataClient {
             if (transactional) { // FIXME: remove once Tephra handles storing 
multiple versions of a cell value, 
                // and allows ignoring empty key values for an operation
                if (Boolean.FALSE.equals(storeNullsProp)) {
-                       throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_MUST_BE_FALSE_FOR_TRANSACTIONAL)
+                       throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL)
                        .setSchemaName(schemaName).setTableName(tableName)
                        .build().buildException();
                }
+               // Force STORE_NULLS to true when transactional as Tephra 
cannot deal with column deletes
+               storeNulls = true;
+               tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, 
Boolean.TRUE);
             }
-            timestamp = timestamp==null ? 
TransactionUtil.getTableTimestamp(connection, transactional, null) : timestamp;
+            timestamp = timestamp==null ? 
TransactionUtil.getTableTimestamp(connection, transactional) : timestamp;
 
             // Delay this check as it is supported to have IMMUTABLE_ROWS and 
SALT_BUCKETS defined on views
             if (statement.getTableType() == PTableType.VIEW || indexId != 
null) {
@@ -1891,8 +1894,7 @@ public class MetaDataClient {
 
                 
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
             default:
-                               connection.removeTable(tenantId, 
SchemaUtil.getTableName(schemaName, tableName), parentTableName,
-                        TransactionUtil.getTableTimestamp(connection, 
transactional, result.getMutationTime()));
+                               connection.removeTable(tenantId, 
SchemaUtil.getTableName(schemaName, tableName), parentTableName, 
result.getMutationTime());
 
                 if (result.getTable() != null && tableType != PTableType.VIEW) 
{
                     connection.setAutoCommit(true);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index a32e922..b78a904 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -831,7 +831,7 @@ public class PTableImpl implements PTable {
 
     @Override
     public synchronized boolean getIndexMaintainers(ImmutableBytesWritable 
ptr, PhoenixConnection connection) {
-        if (indexMaintainersPtr == null) {
+        if (indexMaintainersPtr == null || indexMaintainersPtr.getLength()==0) 
{
             indexMaintainersPtr = new ImmutableBytesWritable();
             if (indexes.isEmpty()) {
                 indexMaintainersPtr.set(ByteUtil.EMPTY_BYTE_ARRAY);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 51428ea..62ec1f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -233,9 +233,10 @@ public class IndexUtil {
     }
     
     public static List<Mutation> generateIndexData(final PTable table, PTable 
index,
-            List<Mutation> dataMutations, ImmutableBytesWritable ptr, final 
KeyValueBuilder kvBuilder, PhoenixConnection connection)
+            List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, 
PhoenixConnection connection)
             throws SQLException {
         try {
+               final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             IndexMaintainer maintainer = index.getIndexMaintainer(table, 
connection);
             List<Mutation> indexMutations = 
Lists.newArrayListWithExpectedSize(dataMutations.size());
             for (final Mutation dataMutation : dataMutations) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 0998e72..c8c468d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -66,7 +67,9 @@ public class ServerUtil {
         } else if (t instanceof IOException) {
             // If the IOException does not wrap any exception, then bubble it 
up.
             Throwable cause = t.getCause();
-            if (cause == null || cause instanceof IOException) {
+            if (cause instanceof RetriesExhaustedWithDetailsException)
+               return new DoNotRetryIOException(t.getMessage(), cause);
+            else if (cause == null || cause instanceof IOException) {
                 return (IOException) t;
             }
             // Else assume it's been wrapped, so throw as 
DoNotRetryIOException to prevent client hanging while retrying

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 893a895..020ac3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -17,17 +17,8 @@
  */
 package org.apache.phoenix.util;
 
-import java.io.IOException;
 import java.sql.SQLException;
 
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCodec;
-import co.cask.tephra.TransactionConflictException;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase98.TransactionAwareHTable;
-
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -36,16 +27,21 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
 
+import co.cask.tephra.TransactionConflictException;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
 public class TransactionUtil {
     private TransactionUtil() {
     }
     
     public static long convertToNanoseconds(long serverTimeStamp) {
-        return serverTimeStamp * 1000000;
+        return serverTimeStamp * TxConstants.MAX_TX_PER_MS;
     }
     
-    public static long convertToMillisecods(Long serverTimeStamp) {
-        return serverTimeStamp / 1000000;
+    public static long convertToMilliseconds(long serverTimeStamp) {
+        return serverTimeStamp / TxConstants.MAX_TX_PER_MS;
     }
     
     public static SQLException getSQLException(TransactionFailureException e) {
@@ -67,10 +63,11 @@ public class TransactionUtil {
        return new TransactionAwareHTable(htable, table.isImmutableRows() ? 
TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
     }
     
-       public static long getResolvedTimestamp(PhoenixConnection connection, 
boolean isTransactional, Long defaultResolvedTimestamp) {
+    // we resolve transactional tables at the txn read pointer
+       public static long getResolvedTimestamp(PhoenixConnection connection, 
boolean isTransactional, long defaultResolvedTimestamp) {
                MutationState mutationState = connection.getMutationState();
                Long scn = connection.getSCN();
-           return scn != null ?  scn : (isTransactional && 
mutationState.isTransactionStarted()) ? 
convertToMillisecods(mutationState.getReadPointer()) : defaultResolvedTimestamp;
+           return scn != null ?  scn : (isTransactional && 
mutationState.isTransactionStarted()) ? 
convertToMilliseconds(mutationState.getReadPointer()) : 
defaultResolvedTimestamp;
        }
 
        public static long getResolvedTime(PhoenixConnection connection, 
MetaDataMutationResult result) {
@@ -79,21 +76,30 @@ public class TransactionUtil {
                return getResolvedTimestamp(connection, isTransactional, 
result.getMutationTime());
        }
 
-       public static long getTableTimestamp(PhoenixConnection connection, 
MetaDataMutationResult result) {
+       public static long getResolvedTimestamp(PhoenixConnection connection, 
MetaDataMutationResult result) {
                PTable table = result.getTable();
                MutationState mutationState = connection.getMutationState();
                boolean txInProgress = table != null && table.isTransactional() 
&& mutationState.isTransactionStarted();
-               return  txInProgress ? 
convertToMillisecods(mutationState.getReadPointer()) : result.getMutationTime();
+               return  txInProgress ? 
convertToMilliseconds(mutationState.getReadPointer()) : 
result.getMutationTime();
        }
 
-       public static Long getTableTimestamp(PhoenixConnection connection, 
boolean transactional, Long mutationTime) throws SQLException {
-               Long timestamp = mutationTime;
+       public static Long getTableTimestamp(PhoenixConnection connection, 
boolean transactional) throws SQLException {
+               Long timestamp = null;
+               if (!transactional) {
+                       return timestamp;
+               }
                MutationState mutationState = connection.getMutationState();
-               if (transactional && !mutationState.isTransactionStarted() && 
connection.getSCN()==null) {
+               // we need to burn a txn so that we are sure the txn read 
pointer is close to wall clock time
+               if (!mutationState.isTransactionStarted()) {
                        mutationState.startTransaction();
-                       timestamp = 
convertToMillisecods(mutationState.getReadPointer());
                        connection.commit();
                }
+               else {
+                       connection.commit();
+               }
+               mutationState.startTransaction();
+               timestamp = 
convertToMilliseconds(mutationState.getReadPointer());
+               connection.commit();
                return timestamp;
        }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index da965c8..9cbde9d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -134,6 +134,7 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.ConfigUtil;
+import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
@@ -178,7 +179,7 @@ import com.google.inject.util.Providers;
  *
  */
 public abstract class BaseTest {
-    private static final String TEST_TABLE_SCHEMA = "(" +
+    protected static final String TEST_TABLE_SCHEMA = "(" +
                        "   varchar_pk VARCHAR NOT NULL, " +
                        "   char_pk CHAR(6) NOT NULL, " +
                        "   int_pk INTEGER NOT NULL, "+ 
@@ -468,9 +469,9 @@ public abstract class BaseTest {
         return conf.get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
     }
     
-    private static String url;
+    protected static String url;
     protected static PhoenixTestDriver driver;
-    private static boolean clusterInitialized = false;
+    protected static boolean clusterInitialized = false;
     private static HBaseTestingUtility utility;
     protected static final Configuration config = HBaseConfiguration.create(); 
     
@@ -495,7 +496,7 @@ public abstract class BaseTest {
         
     }
     
-    private static void setupTxManager() throws SQLException, IOException {
+    protected static void setupTxManager() throws SQLException, IOException {
         config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
         config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, 
"n-times");
         config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
@@ -1729,7 +1730,81 @@ public abstract class BaseTest {
         return utility;
     }
 
-    protected static void createMultiCFTestTable(String tableName) throws 
SQLException {
+    // Populate the test table with data.
+       public static void populateTestTable(String fullTableName) throws 
SQLException {
+           Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+           try (Connection conn = DriverManager.getConnection(getUrl(), 
props)) {
+               String upsert = "UPSERT INTO " + fullTableName
+                       + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?)";
+               PreparedStatement stmt = conn.prepareStatement(upsert);
+               stmt.setString(1, "varchar1");
+               stmt.setString(2, "char1");
+               stmt.setInt(3, 1);
+               stmt.setLong(4, 1L);
+               stmt.setBigDecimal(5, new BigDecimal(1.0));
+               Date date = DateUtil.parseDate("2015-01-01 00:00:00");
+               stmt.setDate(6, date);
+               stmt.setString(7, "varchar_a");
+               stmt.setString(8, "chara");
+               stmt.setInt(9, 2);
+               stmt.setLong(10, 2L);
+               stmt.setBigDecimal(11, new BigDecimal(2.0));
+               stmt.setDate(12, date);
+               stmt.setString(13, "varchar_b");
+               stmt.setString(14, "charb");
+               stmt.setInt(15, 3);
+               stmt.setLong(16, 3L);
+               stmt.setBigDecimal(17, new BigDecimal(3.0));
+               stmt.setDate(18, date);
+               stmt.executeUpdate();
+               
+               stmt.setString(1, "varchar2");
+               stmt.setString(2, "char2");
+               stmt.setInt(3, 2);
+               stmt.setLong(4, 2L);
+               stmt.setBigDecimal(5, new BigDecimal(2.0));
+               date = DateUtil.parseDate("2015-01-02 00:00:00");
+               stmt.setDate(6, date);
+               stmt.setString(7, "varchar_a");
+               stmt.setString(8, "chara");
+               stmt.setInt(9, 3);
+               stmt.setLong(10, 3L);
+               stmt.setBigDecimal(11, new BigDecimal(3.0));
+               stmt.setDate(12, date);
+               stmt.setString(13, "varchar_b");
+               stmt.setString(14, "charb");
+               stmt.setInt(15, 4);
+               stmt.setLong(16, 4L);
+               stmt.setBigDecimal(17, new BigDecimal(4.0));
+               stmt.setDate(18, date);
+               stmt.executeUpdate();
+               
+               stmt.setString(1, "varchar3");
+               stmt.setString(2, "char3");
+               stmt.setInt(3, 3);
+               stmt.setLong(4, 3L);
+               stmt.setBigDecimal(5, new BigDecimal(3.0));
+               date = DateUtil.parseDate("2015-01-03 00:00:00");
+               stmt.setDate(6, date);
+               stmt.setString(7, "varchar_a");
+               stmt.setString(8, "chara");
+               stmt.setInt(9, 4);
+               stmt.setLong(10, 4L);
+               stmt.setBigDecimal(11, new BigDecimal(4.0));
+               stmt.setDate(12, date);
+               stmt.setString(13, "varchar_b");
+               stmt.setString(14, "charb");
+               stmt.setInt(15, 5);
+               stmt.setLong(16, 5L);
+               stmt.setBigDecimal(17, new BigDecimal(5.0));
+               stmt.setDate(18, date);
+               stmt.executeUpdate();
+               
+               conn.commit();
+           }
+       }
+
+       protected static void createMultiCFTestTable(String tableName, String 
options) throws SQLException {
         String ddl = "create table if not exists " + tableName + "(" +
                 "   varchar_pk VARCHAR NOT NULL, " +
                 "   char_pk CHAR(5) NOT NULL, " +
@@ -1747,7 +1822,8 @@ public abstract class BaseTest {
                 "   b.long_col2 BIGINT, " +
                 "   b.decimal_col2 DECIMAL, " +
                 "   b.date_col DATE " + 
-                "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, 
long_pk DESC, decimal_pk))";
+                "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, 
long_pk DESC, decimal_pk)) "
+                + (options!=null? options : "");
             Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
             Connection conn = DriverManager.getConnection(getUrl(), props);
             conn.createStatement().execute(ddl);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index afb6df3..1d10116 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
     <htrace.version>2.04</htrace.version>
     <collections.version>3.2.1</collections.version>
     <jodatime.version>2.3</jodatime.version>
-    <tephra.version>0.6.1</tephra.version>
+    <tephra.version>0.6.3-SNAPSHOT</tephra.version>
 
     <!-- Test Dependencies -->
     <mockito-all.version>1.8.5</mockito-all.version>

Reply via email to