PHOENIX-2478 Rows committed in transaction overlapping index creation are not 
populated


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/13699371
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/13699371
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/13699371

Branch: refs/heads/calcite
Commit: 13699371820928cf14e0e2c5bbffe338c7aa2e93
Parents: f591da4
Author: James Taylor <jtay...@salesforce.com>
Authored: Mon Jan 18 21:14:34 2016 -0800
Committer: James Taylor <jtay...@salesforce.com>
Committed: Mon Jan 18 21:14:34 2016 -0800

----------------------------------------------------------------------
 .../apache/phoenix/execute/MutationState.java   | 478 ++++++++++---------
 .../apache/phoenix/schema/MetaDataClient.java   |  31 +-
 2 files changed, 265 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/13699371/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index ee694e7..a6fe98d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -121,6 +121,7 @@ public class MutationState implements SQLCloseable {
     private static final Logger logger = 
LoggerFactory.getLogger(MutationState.class);
     private static final TransactionCodec CODEC = new TransactionCodec();
     private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0];
+    private static final int MAX_COMMIT_RETRIES = 3;
     
     private final PhoenixConnection connection;
     private final long maxSize;
@@ -160,37 +161,37 @@ public class MutationState implements SQLCloseable {
     }
     
     private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx, TransactionContext txContext, long sizeOffset) {
-       this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, 
txContext);
+        this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, 
txContext);
         this.sizeOffset = sizeOffset;
     }
     
-       MutationState(long maxSize, PhoenixConnection connection,
-                       Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
mutations,
-                       Transaction tx, TransactionContext txContext) {
-               this.maxSize = maxSize;
-               this.connection = connection;
-               this.mutations = mutations;
-               boolean isMetricsEnabled = 
connection.isRequestLevelMetricsEnabled();
-               this.mutationMetricQueue = isMetricsEnabled ? new 
MutationMetricQueue()
-                               : 
NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
-               this.tx = tx;
-               if (tx == null) {
+    MutationState(long maxSize, PhoenixConnection connection,
+            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
+            Transaction tx, TransactionContext txContext) {
+        this.maxSize = maxSize;
+        this.connection = connection;
+        this.mutations = mutations;
+        boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
+        this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
+                : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
+        this.tx = tx;
+        if (tx == null) {
             this.txAwares = Collections.emptyList();
-                   if (txContext == null) {
-                       TransactionSystemClient txServiceClient = 
this.connection
-                                       
.getQueryServices().getTransactionSystemClient();
-                       this.txContext = new 
TransactionContext(txServiceClient);
-                   } else {
-                       isExternalTxContext = true;
-                       this.txContext = txContext;
-                   }
-               } else {
-                       // this code path is only used while running child 
scans, we can't pass the txContext to child scans
-                       // as it is not thread safe, so we use the tx member 
variable
-                       this.txAwares = Lists.newArrayList();
-                       this.txContext = null;
-               }
-       }
+            if (txContext == null) {
+                TransactionSystemClient txServiceClient = this.connection
+                        .getQueryServices().getTransactionSystemClient();
+                this.txContext = new TransactionContext(txServiceClient);
+            } else {
+                isExternalTxContext = true;
+                this.txContext = txContext;
+            }
+        } else {
+            // this code path is only used while running child scans, we can't 
pass the txContext to child scans
+            // as it is not thread safe, so we use the tx member variable
+            this.txAwares = Lists.newArrayList();
+            this.txContext = null;
+        }
+    }
 
     public MutationState(TableRef table, 
Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
         this(maxSize, connection, null, null, sizeOffset);
@@ -217,9 +218,11 @@ public class MutationState implements SQLCloseable {
     public void commitWriteFence(PTable dataTable) throws SQLException {
         if (dataTable.isTransactional()) {
             byte[] key = SchemaUtil.getTableKey(dataTable);
+            boolean success = false;
             try {
                 FenceWait fenceWait = VisibilityFence.prepareWait(key, 
connection.getQueryServices().getTransactionSystemClient());
                 fenceWait.await(10000, TimeUnit.MILLISECONDS);
+                success = true;
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
@@ -235,6 +238,7 @@ public class MutationState implements SQLCloseable {
                 // TODO: seems like an autonomous tx capability in Tephra 
would be useful here.
                 try {
                     txContext.start();
+                    if (logger.isInfoEnabled() && success) logger.info("Added 
write fence at ~" + getTransaction().getReadPointer());
                 } catch (TransactionFailureException e) {
                     throw TransactionUtil.getTransactionFailureException(e);
                 }
@@ -306,18 +310,18 @@ public class MutationState implements SQLCloseable {
             }
             if (hasUncommittedData) {
                 try {
-                       if (txContext == null) {
-                               currentTx = tx = 
connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
-                       }  else {
-                               txContext.checkpoint();
-                               currentTx = tx = 
txContext.getCurrentTransaction();
-                       }
+                    if (txContext == null) {
+                        currentTx = tx = 
connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
+                    }  else {
+                        txContext.checkpoint();
+                        currentTx = tx = txContext.getCurrentTransaction();
+                    }
                     // Since we've checkpointed, we can clear out uncommitted 
set, since a statement run afterwards
                     // should see all this data.
                     uncommittedPhysicalNames.clear();
                 } catch (TransactionFailureException e) {
                     throw new SQLException(e);
-                               } 
+                } 
             }
             // Since we're querying our own table while mutating it, we must 
exclude
             // see our current mutations, otherwise we can get erroneous 
results (for DELETE)
@@ -356,7 +360,7 @@ public class MutationState implements SQLCloseable {
     }
     
     public PhoenixConnection getConnection() {
-       return connection;
+        return connection;
     }
     
     // Kept private as the Transaction may change when check pointed. Keeping 
it private ensures
@@ -366,7 +370,7 @@ public class MutationState implements SQLCloseable {
     }
     
     public boolean isTransactionStarted() {
-       return getTransaction() != null;
+        return getTransaction() != null;
     }
     
     public long getInitialWritePointer() {
@@ -391,11 +395,11 @@ public class MutationState implements SQLCloseable {
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
         }
         
-               if (connection.getSCN() != null) {
-                       throw new SQLExceptionInfo.Builder(
-                                       
SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
-                                       .build().buildException();
-               }
+        if (connection.getSCN() != null) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
+                    .build().buildException();
+        }
         
         try {
             if (!isTransactionStarted()) {
@@ -460,9 +464,9 @@ public class MutationState implements SQLCloseable {
                 // Loop through new rows and replace existing with new
                 for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : 
entry.getValue().entrySet()) {
                     // Replace existing row with new row
-                       RowMutationState existingRowMutationState = 
existingRows.put(rowEntry.getKey(), rowEntry.getValue());
+                    RowMutationState existingRowMutationState = 
existingRows.put(rowEntry.getKey(), rowEntry.getValue());
                     if (existingRowMutationState != null) {
-                       Map<PColumn,byte[]> existingValues = 
existingRowMutationState.getColumnValues();
+                        Map<PColumn,byte[]> existingValues = 
existingRowMutationState.getColumnValues();
                         if (existingValues != PRow.DELETE_MARKER) {
                             Map<PColumn,byte[]> newRow = 
rowEntry.getValue().getColumnValues();
                             // if new row is PRow.DELETE_MARKER, it means 
delete, and we don't need to merge it with existing row. 
@@ -502,7 +506,7 @@ public class MutationState implements SQLCloseable {
     }
     
 
-       private static ImmutableBytesPtr 
getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable 
table) {
+    private static ImmutableBytesPtr 
getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable 
table) {
         RowKeySchema schema = table.getRowKeySchema();
         int rowTimestampColPos = table.getRowTimestampColPos();
         Field rowTimestampField = schema.getField(rowTimestampColPos); 
@@ -523,7 +527,7 @@ public class MutationState implements SQLCloseable {
         return ptr;
     }
     
-       private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final 
TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final 
long timestamp, boolean includeMutableIndexes, final boolean sendAll) { 
+    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final 
TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final 
long timestamp, boolean includeMutableIndexes, final boolean sendAll) { 
         final PTable table = tableRef.getTable();
         final Iterator<PTable> indexes = // Only maintain tables with 
immutable rows through this client-side mechanism
                 (table.isImmutableRows() || includeMutableIndexes) ? 
@@ -554,13 +558,13 @@ public class MutationState implements SQLCloseable {
                                 connection.getKeyValueBuilder(), connection);
                     // we may also have to include delete mutations for 
immutable tables if we are not processing all the tables in the mutations map
                     if (!sendAll) {
-                           TableRef key = new TableRef(index);
-                                               Map<ImmutableBytesPtr, 
RowMutationState> rowToColumnMap = mutations.remove(key);
-                           if (rowToColumnMap!=null) {
-                                   final List<Mutation> deleteMutations = 
Lists.newArrayList();
-                                   generateMutations(tableRef, timestamp, 
rowToColumnMap, deleteMutations, null);
-                                   indexMutations.addAll(deleteMutations);
-                           }
+                        TableRef key = new TableRef(index);
+                        Map<ImmutableBytesPtr, RowMutationState> 
rowToColumnMap = mutations.remove(key);
+                        if (rowToColumnMap!=null) {
+                            final List<Mutation> deleteMutations = 
Lists.newArrayList();
+                            generateMutations(tableRef, timestamp, 
rowToColumnMap, deleteMutations, null);
+                            indexMutations.addAll(deleteMutations);
+                        }
                     }
                 } catch (SQLException e) {
                     throw new IllegalDataException(e);
@@ -676,32 +680,32 @@ public class MutationState implements SQLCloseable {
         };
     }
         
-       /**
-        * Validates that the meta data is valid against the server meta data 
if we haven't yet done so.
-        * Otherwise, for every UPSERT VALUES call, we'd need to hit the server 
to see if the meta data
-        * has changed.
-        * @param connection
-        * @return the server time to use for the upsert
-        * @throws SQLException if the table or any columns no longer exist
-        */
-       private long[] validateAll() throws SQLException {
-           int i = 0;
-           long[] timeStamps = new long[this.mutations.size()];
-           for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry : mutations.entrySet()) {
-               TableRef tableRef = entry.getKey();
-               timeStamps[i++] = validate(tableRef, entry.getValue());
-           }
-           return timeStamps;
-       }
-       
-       private long validate(TableRef tableRef, Map<ImmutableBytesPtr, 
RowMutationState> rowKeyToColumnMap) throws SQLException {
-           Long scn = connection.getSCN();
-           MetaDataClient client = new MetaDataClient(connection);
-           long serverTimeStamp = tableRef.getTimeStamp();
-           // If we're auto committing, we've already validated the schema 
when we got the ColumnResolver,
-           // so no need to do it again here.
-           if (!connection.getAutoCommit()) {
-               PTable table = tableRef.getTable();
+    /**
+     * Validates that the meta data is valid against the server meta data if 
we haven't yet done so.
+     * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to 
see if the meta data
+     * has changed.
+     * @param connection
+     * @return the server time to use for the upsert
+     * @throws SQLException if the table or any columns no longer exist
+     */
+    private long[] validateAll() throws SQLException {
+        int i = 0;
+        long[] timeStamps = new long[this.mutations.size()];
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry : mutations.entrySet()) {
+            TableRef tableRef = entry.getKey();
+            timeStamps[i++] = validate(tableRef, entry.getValue());
+        }
+        return timeStamps;
+    }
+    
+    private long validate(TableRef tableRef, Map<ImmutableBytesPtr, 
RowMutationState> rowKeyToColumnMap) throws SQLException {
+        Long scn = connection.getSCN();
+        MetaDataClient client = new MetaDataClient(connection);
+        long serverTimeStamp = tableRef.getTimeStamp();
+        // If we're auto committing, we've already validated the schema when 
we got the ColumnResolver,
+        // so no need to do it again here.
+        if (!connection.getAutoCommit()) {
+            PTable table = tableRef.getTable();
             MetaDataMutationResult result = 
client.updateCache(table.getSchemaName().getString(), 
table.getTableName().getString());
             PTable resolvedTable = result.getTable();
             if (resolvedTable == null) {
@@ -717,14 +721,14 @@ public class MutationState implements SQLCloseable {
                     // TODO: use bitset?
                     PColumn[] columns = new 
PColumn[resolvedTable.getColumns().size()];
                     for (Map.Entry<ImmutableBytesPtr,RowMutationState> 
rowEntry : rowKeyToColumnMap.entrySet()) {
-                       RowMutationState valueEntry = rowEntry.getValue();
+                        RowMutationState valueEntry = rowEntry.getValue();
                         if (valueEntry != null) {
-                               Map<PColumn, byte[]> colValues = 
valueEntry.getColumnValues();
-                               if (colValues != PRow.DELETE_MARKER) {
+                            Map<PColumn, byte[]> colValues = 
valueEntry.getColumnValues();
+                            if (colValues != PRow.DELETE_MARKER) {
                                 for (PColumn column : colValues.keySet()) {
                                     columns[column.getPosition()] = column;
                                 }
-                               }
+                            }
                         }
                     }
                     for (PColumn column : columns) {
@@ -735,8 +739,8 @@ public class MutationState implements SQLCloseable {
                 }
             }
         }
-           return scn == null ? serverTimeStamp == 
QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp 
: scn;
-       }
+        return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP 
? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn;
+    }
     
     private static long calculateMutationSize(List<Mutation> mutations) {
         long byteSize = 0;
@@ -845,74 +849,74 @@ public class MutationState implements SQLCloseable {
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing 
mutations to tables")) {
             Span span = trace.getSpan();
-               ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable();
-               boolean isTransactional;
-               while (tableRefIterator.hasNext()) {
-                       // at this point we are going through mutations for 
each table
-                   final TableRef tableRef = tableRefIterator.next();
-                   valuesMap = mutations.get(tableRef);
-                   if (valuesMap == null || valuesMap.isEmpty()) {
-                       continue;
-                   }
+            ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable();
+            boolean isTransactional;
+            while (tableRefIterator.hasNext()) {
+                // at this point we are going through mutations for each table
+                final TableRef tableRef = tableRefIterator.next();
+                valuesMap = mutations.get(tableRef);
+                if (valuesMap == null || valuesMap.isEmpty()) {
+                    continue;
+                }
                 // Validate as we go if transactional since we can undo if a 
problem occurs (which is unlikely)
                 long serverTimestamp = serverTimeStamps == null ? 
validate(tableRef, valuesMap) : serverTimeStamps[i++];
-                   final PTable table = tableRef.getTable();
-                   // Track tables to which we've sent uncommitted data
-                   if (isTransactional = table.isTransactional()) {
-                       txTableRefs.add(tableRef);
-                       
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
-                   }
-                   boolean isDataTable = true;
+                final PTable table = tableRef.getTable();
+                // Track tables to which we've sent uncommitted data
+                if (isTransactional = table.isTransactional()) {
+                    txTableRefs.add(tableRef);
+                    
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
+                }
+                boolean isDataTable = true;
                 table.getIndexMaintainers(indexMetaDataPtr, connection);
-                   Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
-                   while (mutationsIterator.hasNext()) {
-                       Pair<byte[],List<Mutation>> pair = 
mutationsIterator.next();
-                       byte[] htableName = pair.getFirst();
-                       List<Mutation> mutationList = pair.getSecond();
-                       
-                       //create a span per target table
-                       //TODO maybe we can be smarter about the table name to 
string here?
-                       Span child = Tracing.child(span,"Writing mutation batch 
for table: "+Bytes.toString(htableName));
-       
-                       int retryCount = 0;
-                       boolean shouldRetry = false;
-                       do {
-                           final ServerCache cache = isDataTable ? 
setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr) : null;
-                       
-                           // If we haven't retried yet, retry for this case 
only, as it's possible that
-                           // a split will occur after we send the index 
metadata cache to all known
-                           // region servers.
-                           shouldRetry = cache != null;
-                           SQLException sqlE = null;
-                           HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
-                           try {
-                               if (isTransactional) {
-                                   // If we have indexes, wrap the HTable in a 
delegate HTable that
-                                   // will attach the necessary index meta 
data in the event of a
-                                   // rollback
-                                   if (!table.getIndexes().isEmpty()) {
-                                       hTable = new 
MetaDataAwareHTable(hTable, tableRef);
-                                   }
-                                   TransactionAwareHTable txnAware = 
TransactionUtil.getTransactionAwareHTable(hTable, table);
-                                   // Don't add immutable indexes (those are 
the only ones that would participate
-                                   // during a commit), as we don't need 
conflict detection for these.
-                                   if (isDataTable) {
-                                       // Even for immutable, we need to do 
this so that an abort has the state
-                                       // necessary to generate the rows to 
delete.
-                                       addTransactionParticipant(txnAware);
-                                   } else {
-                                       txnAware.startTx(getTransaction());
-                                   }
-                                   hTable = txnAware;
-                               }
-                               long numMutations = mutationList.size();
+                Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
+                while (mutationsIterator.hasNext()) {
+                    Pair<byte[],List<Mutation>> pair = 
mutationsIterator.next();
+                    byte[] htableName = pair.getFirst();
+                    List<Mutation> mutationList = pair.getSecond();
+                    
+                    //create a span per target table
+                    //TODO maybe we can be smarter about the table name to 
string here?
+                    Span child = Tracing.child(span,"Writing mutation batch 
for table: "+Bytes.toString(htableName));
+    
+                    int retryCount = 0;
+                    boolean shouldRetry = false;
+                    do {
+                        final ServerCache cache = isDataTable ? 
setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr) : null;
+                    
+                        // If we haven't retried yet, retry for this case 
only, as it's possible that
+                        // a split will occur after we send the index metadata 
cache to all known
+                        // region servers.
+                        shouldRetry = cache != null;
+                        SQLException sqlE = null;
+                        HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
+                        try {
+                            if (isTransactional) {
+                                // If we have indexes, wrap the HTable in a 
delegate HTable that
+                                // will attach the necessary index meta data 
in the event of a
+                                // rollback
+                                if (!table.getIndexes().isEmpty()) {
+                                    hTable = new MetaDataAwareHTable(hTable, 
tableRef);
+                                }
+                                TransactionAwareHTable txnAware = 
TransactionUtil.getTransactionAwareHTable(hTable, table);
+                                // Don't add immutable indexes (those are the 
only ones that would participate
+                                // during a commit), as we don't need conflict 
detection for these.
+                                if (isDataTable) {
+                                    // Even for immutable, we need to do this 
so that an abort has the state
+                                    // necessary to generate the rows to 
delete.
+                                    addTransactionParticipant(txnAware);
+                                } else {
+                                    txnAware.startTx(getTransaction());
+                                }
+                                hTable = txnAware;
+                            }
+                            long numMutations = mutationList.size();
                             GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
                             
                             long startTime = System.currentTimeMillis();
                             child.addTimelineAnnotation("Attempt " + 
retryCount);
-                               hTable.batch(mutationList);
-                               child.stop();
-                               child.stop();
+                            hTable.batch(mutationList);
+                            child.stop();
+                            child.stop();
                             shouldRetry = false;
                             long mutationCommitTime = 
System.currentTimeMillis() - startTime;
                             
GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
@@ -920,80 +924,80 @@ public class MutationState implements SQLCloseable {
                             long mutationSizeBytes = 
calculateMutationSize(mutationList);
                             MutationMetric mutationsMetric = new 
MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime);
                             
mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), 
mutationsMetric);
-                           } catch (Exception e) {
-                               SQLException inferredE = 
ServerUtil.parseServerExceptionOrNull(e);
-                               if (inferredE != null) {
-                                   if (shouldRetry && retryCount == 0 && 
inferredE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
-                                       // Swallow this exception once, as it's 
possible that we split after sending the index metadata
-                                       // and one of the region servers 
doesn't have it. This will cause it to have it the next go around.
-                                       // If it fails again, we don't retry.
-                                       String msg = "Swallowing exception and 
retrying after clearing meta cache on connection. " + inferredE;
-                                       
logger.warn(LogUtil.addCustomAnnotations(msg, connection));
-                                       
connection.getQueryServices().clearTableRegionCache(htableName);
-       
-                                       // add a new child span as this one 
failed
-                                       child.addTimelineAnnotation(msg);
-                                       child.stop();
-                                       child = Tracing.child(span,"Failed 
batch, attempting retry");
-       
-                                       continue;
-                                   }
-                                   e = inferredE;
-                               }
-                               // Throw to client an exception that indicates 
the statements that
-                               // were not committed successfully.
-                               sqlE = new CommitException(e, 
getUncommittedStatementIndexes());
-                           } finally {
-                               try {
-                                   if (cache != null) {
-                                       cache.close();
-                                   }
-                               } finally {
-                                   try {
-                                       hTable.close();
-                                   } 
-                                   catch (IOException e) {
-                                       if (sqlE != null) {
-                                           
sqlE.setNextException(ServerUtil.parseServerException(e));
-                                       } else {
-                                           sqlE = 
ServerUtil.parseServerException(e);
-                                       }
-                                   } 
-                                   if (sqlE != null) {
-                                       throw sqlE;
-                                   }
-                               }
-                           }
-                       } while (shouldRetry && retryCount++ < 1);
-                       isDataTable = false;
-                   }
-                   if (tableRef.getTable().getType() != PTableType.INDEX) {
-                       numRows -= valuesMap.size();
-                   }
-                   // For transactions, track the statement indexes as we send 
data
-                   // over because our CommitException should include all 
statements
-                   // involved in the transaction since none of them would 
have been
-                   // committed in the event of a failure.
-                   if (isTransactional) {
-                       addUncommittedStatementIndexes(valuesMap.values());
-                       if (txMutations == null) {
-                           txMutations = 
Maps.newHashMapWithExpectedSize(mutations.size());
-                       }
-                       // Keep all mutations we've encountered until a commit 
or rollback.
-                       // This is not ideal, but there's not good way to get 
the values back
-                       // in the event that we need to replay the commit.
-                       txMutations.put(tableRef, valuesMap);
-                   }
+                        } catch (Exception e) {
+                            SQLException inferredE = 
ServerUtil.parseServerExceptionOrNull(e);
+                            if (inferredE != null) {
+                                if (shouldRetry && retryCount == 0 && 
inferredE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
+                                    // Swallow this exception once, as it's 
possible that we split after sending the index metadata
+                                    // and one of the region servers doesn't 
have it. This will cause it to have it the next go around.
+                                    // If it fails again, we don't retry.
+                                    String msg = "Swallowing exception and 
retrying after clearing meta cache on connection. " + inferredE;
+                                    
logger.warn(LogUtil.addCustomAnnotations(msg, connection));
+                                    
connection.getQueryServices().clearTableRegionCache(htableName);
+    
+                                    // add a new child span as this one failed
+                                    child.addTimelineAnnotation(msg);
+                                    child.stop();
+                                    child = Tracing.child(span,"Failed batch, 
attempting retry");
+    
+                                    continue;
+                                }
+                                e = inferredE;
+                            }
+                            // Throw to client an exception that indicates the 
statements that
+                            // were not committed successfully.
+                            sqlE = new CommitException(e, 
getUncommittedStatementIndexes());
+                        } finally {
+                            try {
+                                if (cache != null) {
+                                    cache.close();
+                                }
+                            } finally {
+                                try {
+                                    hTable.close();
+                                } 
+                                catch (IOException e) {
+                                    if (sqlE != null) {
+                                        
sqlE.setNextException(ServerUtil.parseServerException(e));
+                                    } else {
+                                        sqlE = 
ServerUtil.parseServerException(e);
+                                    }
+                                } 
+                                if (sqlE != null) {
+                                    throw sqlE;
+                                }
+                            }
+                        }
+                    } while (shouldRetry && retryCount++ < 1);
+                    isDataTable = false;
+                }
+                if (tableRef.getTable().getType() != PTableType.INDEX) {
+                    numRows -= valuesMap.size();
+                }
+                // For transactions, track the statement indexes as we send 
data
+                // over because our CommitException should include all 
statements
+                // involved in the transaction since none of them would have 
been
+                // committed in the event of a failure.
+                if (isTransactional) {
+                    addUncommittedStatementIndexes(valuesMap.values());
+                    if (txMutations == null) {
+                        txMutations = 
Maps.newHashMapWithExpectedSize(mutations.size());
+                    }
+                    // Keep all mutations we've encountered until a commit or 
rollback.
+                    // This is not ideal, but there's not good way to get the 
values back
+                    // in the event that we need to replay the commit.
+                    txMutations.put(tableRef, valuesMap);
+                }
                 // Remove batches as we process them
-                   if (sendAll) {
-                       // Iterating through map key set in this case, so we 
cannot use
-                       // the remove method without getting a concurrent 
modification
-                       // exception.
-                       tableRefIterator.remove();
-                   } else {
-                       mutations.remove(tableRef);
-                   }
-               }
+                if (sendAll) {
+                    // Iterating through map key set in this case, so we 
cannot use
+                    // the remove method without getting a concurrent 
modification
+                    // exception.
+                    tableRefIterator.remove();
+                } else {
+                    mutations.remove(tableRef);
+                }
+            }
         }
     }
 
@@ -1006,7 +1010,7 @@ public class MutationState implements SQLCloseable {
     }
     
     public static Transaction decodeTransaction(byte[] txnBytes) throws 
IOException {
-       return (txnBytes == null || txnBytes.length==0) ? null : 
CODEC.decode(txnBytes);
+        return (txnBytes == null || txnBytes.length==0) ? null : 
CODEC.decode(txnBytes);
     }
 
     private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? 
extends Mutation> mutations,
@@ -1059,10 +1063,10 @@ public class MutationState implements SQLCloseable {
     }
     
     private int[] getUncommittedStatementIndexes() {
-       for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : 
mutations.values()) {
-           addUncommittedStatementIndexes(rowMutationMap.values());
-       }
-       return uncommittedStatementIndexes;
+        for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : 
mutations.values()) {
+            addUncommittedStatementIndexes(rowMutationMap.values());
+        }
+        return uncommittedStatementIndexes;
     }
     
     @Override
@@ -1101,9 +1105,9 @@ public class MutationState implements SQLCloseable {
         Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = 
Collections.emptyMap();
         int retryCount = 0;
         do {
-               boolean sendSuccessful=false;
-               boolean retryCommit = false;
-               SQLException sqlE = null;
+            boolean sendSuccessful=false;
+            boolean retryCommit = false;
+            SQLException sqlE = null;
             try {
                 send();
                 txMutations = this.txMutations;
@@ -1121,7 +1125,8 @@ public class MutationState implements SQLCloseable {
                                 finishSuccessful = true;
                             }
                         } catch (TransactionFailureException e) {
-                            retryCommit = (e instanceof 
TransactionConflictException && retryCount == 0);
+                            if (logger.isInfoEnabled()) 
logger.info(e.getClass().getName() + " at timestamp " + 
getInitialWritePointer() + " with retry count of " + retryCount);
+                            retryCommit = (e instanceof 
TransactionConflictException && retryCount < MAX_COMMIT_RETRIES);
                             txFailure = e;
                             SQLException nextE = 
TransactionUtil.getTransactionFailureException(e);
                             if (sqlE == null) {
@@ -1134,7 +1139,9 @@ public class MutationState implements SQLCloseable {
                             if (!finishSuccessful) {
                                 try {
                                     txContext.abort(txFailure);
+                                    if (logger.isInfoEnabled()) 
logger.info("Abort successful");
                                 } catch (TransactionFailureException e) {
+                                    if (logger.isInfoEnabled()) 
logger.info("Abort failed with " + e);
                                     SQLException nextE = 
TransactionUtil.getTransactionFailureException(e);
                                     if (sqlE == null) {
                                         sqlE = nextE;
@@ -1151,8 +1158,15 @@ public class MutationState implements SQLCloseable {
                     } finally {
                         if (retryCommit) {
                             startTransaction();
+                            // Add back read fences
+                            Set<TableRef> txTableRefs = txMutations.keySet();
+                            for (TableRef tableRef : txTableRefs) {
+                                PTable dataTable = tableRef.getTable();
+                                addReadFence(dataTable);
+                            }
                             try {
-                                retryCommit = 
wasIndexAdded(txMutations.keySet());
+                                // Only retry if an index was added
+                                retryCommit = wasIndexAdded(txTableRefs);
                             } catch (SQLException e) {
                                 retryCommit = false;
                                 if (sqlE == null) {
@@ -1173,7 +1187,9 @@ public class MutationState implements SQLCloseable {
                 break;
             }
             retryCount++;
-            mutations.putAll(txMutations);
+            if (txMutations != null) {
+                mutations.putAll(txMutations);
+            }
         } while (true);
     }
 
@@ -1183,6 +1199,7 @@ public class MutationState implements SQLCloseable {
      * @throws SQLException 
      */
     private boolean wasIndexAdded(Set<TableRef> txTableRefs) throws 
SQLException {
+        if (logger.isInfoEnabled()) logger.info("Checking for index updates as 
of "  + getInitialWritePointer());
         MetaDataClient client = new MetaDataClient(connection);
         PMetaData cache = connection.getMetaDataCache();
         boolean addedIndexes = false;
@@ -1198,6 +1215,7 @@ public class MutationState implements SQLCloseable {
                 throw new 
TableNotFoundException(dataTable.getSchemaName().getString(), 
dataTable.getTableName().getString());
             }
             if (!result.wasUpdated()) {
+                if (logger.isInfoEnabled()) logger.info("No updates to " + 
dataTable.getName().getString() + " as of "  + timestamp);
                 continue;
             }
             if (!addedIndexes) {
@@ -1205,8 +1223,10 @@ public class MutationState implements SQLCloseable {
                 // that an index was dropped and recreated with the same name 
but different
                 // indexed/covered columns.
                 addedIndexes = 
(!oldIndexes.equals(result.getTable().getIndexes()));
+                if (logger.isInfoEnabled()) logger.info((addedIndexes ? 
"Updates " : "No updates ") + "as of "  + timestamp + " to " + 
dataTable.getName().getString() + " with indexes " + dataTable.getIndexes());
             }
         }
+        if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : 
"No updates ") + "to indexes as of "  + getInitialWritePointer());
         return addedIndexes;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/13699371/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ee212ed..d134f08 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -479,8 +479,8 @@ public class MetaDataClient {
         // Do not make rpc to getTable if 
         // 1. table is a system table
         // 2. table was already resolved as of that timestamp
-               if (table != null && !alwaysHitServer
-                               && (systemTable || resolvedTimestamp == 
tableResolvedTimestamp || connection.getMetaDataCache().getAge(tableRef) < 
table.getUpdateCacheFrequency())) {
+        if (table != null && !alwaysHitServer
+                && (systemTable || resolvedTimestamp == tableResolvedTimestamp 
|| connection.getMetaDataCache().getAge(tableRef) < 
table.getUpdateCacheFrequency())) {
             return new 
MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 
QueryConstants.UNSET_TIMESTAMP, table);
         }
 
@@ -1383,6 +1383,7 @@ public class MetaDataClient {
             return new MutationState(0,connection);
         }
 
+        if (logger.isInfoEnabled()) logger.info("Created index " + 
table.getName().getString() + " at " + table.getTimeStamp());
         // In async process, we return immediately as the MR job needs to be 
triggered .
         if(statement.isAsync()) {
             return new MutationState(0, connection);
@@ -2887,19 +2888,19 @@ public class MetaDataClient {
                     String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
                     long resolvedTimeStamp = 
TransactionUtil.getResolvedTime(connection, result);
                     if (table.getIndexes().isEmpty() || (numPkColumnsAdded==0 
&& !nonTxToTx)) {
-                                               connection.addColumn(
-                                                               tenantId,
-                                                               fullTableName,
-                                                               columns,
-                                                               
result.getMutationTime(),
-                                                               seqNum,
-                                                               isImmutableRows 
== null ? table.isImmutableRows() : isImmutableRows,
-                                                               disableWAL == 
null ? table.isWALDisabled() : disableWAL,
-                                                               multiTenant == 
null ? table.isMultiTenant() : multiTenant,
-                                                               storeNulls == 
null ? table.getStoreNulls() : storeNulls, 
-                                                               isTransactional 
== null ? table.isTransactional() : isTransactional,
-                                                               
updateCacheFrequency == null ? table.getUpdateCacheFrequency() : 
updateCacheFrequency,
-                                                               
resolvedTimeStamp);
+                        connection.addColumn(
+                                tenantId,
+                                fullTableName,
+                                columns,
+                                result.getMutationTime(),
+                                seqNum,
+                                isImmutableRows == null ? 
table.isImmutableRows() : isImmutableRows,
+                                disableWAL == null ? table.isWALDisabled() : 
disableWAL,
+                                multiTenant == null ? table.isMultiTenant() : 
multiTenant,
+                                storeNulls == null ? table.getStoreNulls() : 
storeNulls, 
+                                isTransactional == null ? 
table.isTransactional() : isTransactional,
+                                updateCacheFrequency == null ? 
table.getUpdateCacheFrequency() : updateCacheFrequency,
+                                resolvedTimeStamp);
                     } else if (updateCacheFrequency != null) {
                         // Force removal from cache as the update cache 
frequency has changed
                         // Note that clients outside this JVM won't be 
affected.

Reply via email to