PHOENIX-2478 Rows committed in transaction overlapping index creation are not populated
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/13699371 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/13699371 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/13699371 Branch: refs/heads/calcite Commit: 13699371820928cf14e0e2c5bbffe338c7aa2e93 Parents: f591da4 Author: James Taylor <jtay...@salesforce.com> Authored: Mon Jan 18 21:14:34 2016 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Mon Jan 18 21:14:34 2016 -0800 ---------------------------------------------------------------------- .../apache/phoenix/execute/MutationState.java | 478 ++++++++++--------- .../apache/phoenix/schema/MetaDataClient.java | 31 +- 2 files changed, 265 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/13699371/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index ee694e7..a6fe98d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -121,6 +121,7 @@ public class MutationState implements SQLCloseable { private static final Logger logger = LoggerFactory.getLogger(MutationState.class); private static final TransactionCodec CODEC = new TransactionCodec(); private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0]; + private static final int MAX_COMMIT_RETRIES = 3; private final PhoenixConnection connection; private final long maxSize; @@ -160,37 +161,37 @@ public class MutationState implements SQLCloseable { } private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) { - this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, txContext); + this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, txContext); this.sizeOffset = sizeOffset; } - MutationState(long maxSize, PhoenixConnection connection, - Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, - Transaction tx, TransactionContext txContext) { - this.maxSize = maxSize; - this.connection = connection; - this.mutations = mutations; - boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); - this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() - : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; - this.tx = tx; - if (tx == null) { + MutationState(long maxSize, PhoenixConnection connection, + Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, + Transaction tx, TransactionContext txContext) { + this.maxSize = maxSize; + this.connection = connection; + this.mutations = mutations; + boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); + this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() + : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; + this.tx = tx; + if (tx == null) { this.txAwares = Collections.emptyList(); - if (txContext == null) { - TransactionSystemClient txServiceClient = this.connection - .getQueryServices().getTransactionSystemClient(); - this.txContext = new TransactionContext(txServiceClient); - } else { - isExternalTxContext = true; - this.txContext = txContext; - } - } else { - // this code path is only used while running child scans, we can't pass the txContext to child scans - // as it is not thread safe, so we use the tx member variable - this.txAwares = Lists.newArrayList(); - this.txContext = null; - } - } + if (txContext == null) { + TransactionSystemClient txServiceClient = this.connection + .getQueryServices().getTransactionSystemClient(); + this.txContext = new TransactionContext(txServiceClient); + } else { + isExternalTxContext = true; + this.txContext = txContext; + } + } else { + // this code path is only used while running child scans, we can't pass the txContext to child scans + // as it is not thread safe, so we use the tx member variable + this.txAwares = Lists.newArrayList(); + this.txContext = null; + } + } public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { this(maxSize, connection, null, null, sizeOffset); @@ -217,9 +218,11 @@ public class MutationState implements SQLCloseable { public void commitWriteFence(PTable dataTable) throws SQLException { if (dataTable.isTransactional()) { byte[] key = SchemaUtil.getTableKey(dataTable); + boolean success = false; try { FenceWait fenceWait = VisibilityFence.prepareWait(key, connection.getQueryServices().getTransactionSystemClient()); fenceWait.await(10000, TimeUnit.MILLISECONDS); + success = true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); @@ -235,6 +238,7 @@ public class MutationState implements SQLCloseable { // TODO: seems like an autonomous tx capability in Tephra would be useful here. try { txContext.start(); + if (logger.isInfoEnabled() && success) logger.info("Added write fence at ~" + getTransaction().getReadPointer()); } catch (TransactionFailureException e) { throw TransactionUtil.getTransactionFailureException(e); } @@ -306,18 +310,18 @@ public class MutationState implements SQLCloseable { } if (hasUncommittedData) { try { - if (txContext == null) { - currentTx = tx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx); - } else { - txContext.checkpoint(); - currentTx = tx = txContext.getCurrentTransaction(); - } + if (txContext == null) { + currentTx = tx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx); + } else { + txContext.checkpoint(); + currentTx = tx = txContext.getCurrentTransaction(); + } // Since we've checkpointed, we can clear out uncommitted set, since a statement run afterwards // should see all this data. uncommittedPhysicalNames.clear(); } catch (TransactionFailureException e) { throw new SQLException(e); - } + } } // Since we're querying our own table while mutating it, we must exclude // see our current mutations, otherwise we can get erroneous results (for DELETE) @@ -356,7 +360,7 @@ public class MutationState implements SQLCloseable { } public PhoenixConnection getConnection() { - return connection; + return connection; } // Kept private as the Transaction may change when check pointed. Keeping it private ensures @@ -366,7 +370,7 @@ public class MutationState implements SQLCloseable { } public boolean isTransactionStarted() { - return getTransaction() != null; + return getTransaction() != null; } public long getInitialWritePointer() { @@ -391,11 +395,11 @@ public class MutationState implements SQLCloseable { throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException(); } - if (connection.getSCN() != null) { - throw new SQLExceptionInfo.Builder( - SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET) - .build().buildException(); - } + if (connection.getSCN() != null) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET) + .build().buildException(); + } try { if (!isTransactionStarted()) { @@ -460,9 +464,9 @@ public class MutationState implements SQLCloseable { // Loop through new rows and replace existing with new for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) { // Replace existing row with new row - RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue()); + RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue()); if (existingRowMutationState != null) { - Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues(); + Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues(); if (existingValues != PRow.DELETE_MARKER) { Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues(); // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. @@ -502,7 +506,7 @@ public class MutationState implements SQLCloseable { } - private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) { + private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) { RowKeySchema schema = table.getRowKeySchema(); int rowTimestampColPos = table.getRowTimestampColPos(); Field rowTimestampField = schema.getField(rowTimestampColPos); @@ -523,7 +527,7 @@ public class MutationState implements SQLCloseable { return ptr; } - private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long timestamp, boolean includeMutableIndexes, final boolean sendAll) { + private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long timestamp, boolean includeMutableIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism (table.isImmutableRows() || includeMutableIndexes) ? @@ -554,13 +558,13 @@ public class MutationState implements SQLCloseable { connection.getKeyValueBuilder(), connection); // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map if (!sendAll) { - TableRef key = new TableRef(index); - Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key); - if (rowToColumnMap!=null) { - final List<Mutation> deleteMutations = Lists.newArrayList(); - generateMutations(tableRef, timestamp, rowToColumnMap, deleteMutations, null); - indexMutations.addAll(deleteMutations); - } + TableRef key = new TableRef(index); + Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key); + if (rowToColumnMap!=null) { + final List<Mutation> deleteMutations = Lists.newArrayList(); + generateMutations(tableRef, timestamp, rowToColumnMap, deleteMutations, null); + indexMutations.addAll(deleteMutations); + } } } catch (SQLException e) { throw new IllegalDataException(e); @@ -676,32 +680,32 @@ public class MutationState implements SQLCloseable { }; } - /** - * Validates that the meta data is valid against the server meta data if we haven't yet done so. - * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data - * has changed. - * @param connection - * @return the server time to use for the upsert - * @throws SQLException if the table or any columns no longer exist - */ - private long[] validateAll() throws SQLException { - int i = 0; - long[] timeStamps = new long[this.mutations.size()]; - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) { - TableRef tableRef = entry.getKey(); - timeStamps[i++] = validate(tableRef, entry.getValue()); - } - return timeStamps; - } - - private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException { - Long scn = connection.getSCN(); - MetaDataClient client = new MetaDataClient(connection); - long serverTimeStamp = tableRef.getTimeStamp(); - // If we're auto committing, we've already validated the schema when we got the ColumnResolver, - // so no need to do it again here. - if (!connection.getAutoCommit()) { - PTable table = tableRef.getTable(); + /** + * Validates that the meta data is valid against the server meta data if we haven't yet done so. + * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data + * has changed. + * @param connection + * @return the server time to use for the upsert + * @throws SQLException if the table or any columns no longer exist + */ + private long[] validateAll() throws SQLException { + int i = 0; + long[] timeStamps = new long[this.mutations.size()]; + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) { + TableRef tableRef = entry.getKey(); + timeStamps[i++] = validate(tableRef, entry.getValue()); + } + return timeStamps; + } + + private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException { + Long scn = connection.getSCN(); + MetaDataClient client = new MetaDataClient(connection); + long serverTimeStamp = tableRef.getTimeStamp(); + // If we're auto committing, we've already validated the schema when we got the ColumnResolver, + // so no need to do it again here. + if (!connection.getAutoCommit()) { + PTable table = tableRef.getTable(); MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); PTable resolvedTable = result.getTable(); if (resolvedTable == null) { @@ -717,14 +721,14 @@ public class MutationState implements SQLCloseable { // TODO: use bitset? PColumn[] columns = new PColumn[resolvedTable.getColumns().size()]; for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) { - RowMutationState valueEntry = rowEntry.getValue(); + RowMutationState valueEntry = rowEntry.getValue(); if (valueEntry != null) { - Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); - if (colValues != PRow.DELETE_MARKER) { + Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); + if (colValues != PRow.DELETE_MARKER) { for (PColumn column : colValues.keySet()) { columns[column.getPosition()] = column; } - } + } } } for (PColumn column : columns) { @@ -735,8 +739,8 @@ public class MutationState implements SQLCloseable { } } } - return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn; - } + return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn; + } private static long calculateMutationSize(List<Mutation> mutations) { long byteSize = 0; @@ -845,74 +849,74 @@ public class MutationState implements SQLCloseable { // add tracing for this operation try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { Span span = trace.getSpan(); - ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); - boolean isTransactional; - while (tableRefIterator.hasNext()) { - // at this point we are going through mutations for each table - final TableRef tableRef = tableRefIterator.next(); - valuesMap = mutations.get(tableRef); - if (valuesMap == null || valuesMap.isEmpty()) { - continue; - } + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); + boolean isTransactional; + while (tableRefIterator.hasNext()) { + // at this point we are going through mutations for each table + final TableRef tableRef = tableRefIterator.next(); + valuesMap = mutations.get(tableRef); + if (valuesMap == null || valuesMap.isEmpty()) { + continue; + } // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; - final PTable table = tableRef.getTable(); - // Track tables to which we've sent uncommitted data - if (isTransactional = table.isTransactional()) { - txTableRefs.add(tableRef); - uncommittedPhysicalNames.add(table.getPhysicalName().getString()); - } - boolean isDataTable = true; + final PTable table = tableRef.getTable(); + // Track tables to which we've sent uncommitted data + if (isTransactional = table.isTransactional()) { + txTableRefs.add(tableRef); + uncommittedPhysicalNames.add(table.getPhysicalName().getString()); + } + boolean isDataTable = true; table.getIndexMaintainers(indexMetaDataPtr, connection); - Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll); - while (mutationsIterator.hasNext()) { - Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); - byte[] htableName = pair.getFirst(); - List<Mutation> mutationList = pair.getSecond(); - - //create a span per target table - //TODO maybe we can be smarter about the table name to string here? - Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); - - int retryCount = 0; - boolean shouldRetry = false; - do { - final ServerCache cache = isDataTable ? setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr) : null; - - // If we haven't retried yet, retry for this case only, as it's possible that - // a split will occur after we send the index metadata cache to all known - // region servers. - shouldRetry = cache != null; - SQLException sqlE = null; - HTableInterface hTable = connection.getQueryServices().getTable(htableName); - try { - if (isTransactional) { - // If we have indexes, wrap the HTable in a delegate HTable that - // will attach the necessary index meta data in the event of a - // rollback - if (!table.getIndexes().isEmpty()) { - hTable = new MetaDataAwareHTable(hTable, tableRef); - } - TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table); - // Don't add immutable indexes (those are the only ones that would participate - // during a commit), as we don't need conflict detection for these. - if (isDataTable) { - // Even for immutable, we need to do this so that an abort has the state - // necessary to generate the rows to delete. - addTransactionParticipant(txnAware); - } else { - txnAware.startTx(getTransaction()); - } - hTable = txnAware; - } - long numMutations = mutationList.size(); + Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll); + while (mutationsIterator.hasNext()) { + Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); + byte[] htableName = pair.getFirst(); + List<Mutation> mutationList = pair.getSecond(); + + //create a span per target table + //TODO maybe we can be smarter about the table name to string here? + Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); + + int retryCount = 0; + boolean shouldRetry = false; + do { + final ServerCache cache = isDataTable ? setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr) : null; + + // If we haven't retried yet, retry for this case only, as it's possible that + // a split will occur after we send the index metadata cache to all known + // region servers. + shouldRetry = cache != null; + SQLException sqlE = null; + HTableInterface hTable = connection.getQueryServices().getTable(htableName); + try { + if (isTransactional) { + // If we have indexes, wrap the HTable in a delegate HTable that + // will attach the necessary index meta data in the event of a + // rollback + if (!table.getIndexes().isEmpty()) { + hTable = new MetaDataAwareHTable(hTable, tableRef); + } + TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table); + // Don't add immutable indexes (those are the only ones that would participate + // during a commit), as we don't need conflict detection for these. + if (isDataTable) { + // Even for immutable, we need to do this so that an abort has the state + // necessary to generate the rows to delete. + addTransactionParticipant(txnAware); + } else { + txnAware.startTx(getTransaction()); + } + hTable = txnAware; + } + long numMutations = mutationList.size(); GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); long startTime = System.currentTimeMillis(); child.addTimelineAnnotation("Attempt " + retryCount); - hTable.batch(mutationList); - child.stop(); - child.stop(); + hTable.batch(mutationList); + child.stop(); + child.stop(); shouldRetry = false; long mutationCommitTime = System.currentTimeMillis() - startTime; GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); @@ -920,80 +924,80 @@ public class MutationState implements SQLCloseable { long mutationSizeBytes = calculateMutationSize(mutationList); MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime); mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); - } catch (Exception e) { - SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); - if (inferredE != null) { - if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { - // Swallow this exception once, as it's possible that we split after sending the index metadata - // and one of the region servers doesn't have it. This will cause it to have it the next go around. - // If it fails again, we don't retry. - String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; - logger.warn(LogUtil.addCustomAnnotations(msg, connection)); - connection.getQueryServices().clearTableRegionCache(htableName); - - // add a new child span as this one failed - child.addTimelineAnnotation(msg); - child.stop(); - child = Tracing.child(span,"Failed batch, attempting retry"); - - continue; - } - e = inferredE; - } - // Throw to client an exception that indicates the statements that - // were not committed successfully. - sqlE = new CommitException(e, getUncommittedStatementIndexes()); - } finally { - try { - if (cache != null) { - cache.close(); - } - } finally { - try { - hTable.close(); - } - catch (IOException e) { - if (sqlE != null) { - sqlE.setNextException(ServerUtil.parseServerException(e)); - } else { - sqlE = ServerUtil.parseServerException(e); - } - } - if (sqlE != null) { - throw sqlE; - } - } - } - } while (shouldRetry && retryCount++ < 1); - isDataTable = false; - } - if (tableRef.getTable().getType() != PTableType.INDEX) { - numRows -= valuesMap.size(); - } - // For transactions, track the statement indexes as we send data - // over because our CommitException should include all statements - // involved in the transaction since none of them would have been - // committed in the event of a failure. - if (isTransactional) { - addUncommittedStatementIndexes(valuesMap.values()); - if (txMutations == null) { - txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); - } - // Keep all mutations we've encountered until a commit or rollback. - // This is not ideal, but there's not good way to get the values back - // in the event that we need to replay the commit. - txMutations.put(tableRef, valuesMap); - } + } catch (Exception e) { + SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); + if (inferredE != null) { + if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { + // Swallow this exception once, as it's possible that we split after sending the index metadata + // and one of the region servers doesn't have it. This will cause it to have it the next go around. + // If it fails again, we don't retry. + String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; + logger.warn(LogUtil.addCustomAnnotations(msg, connection)); + connection.getQueryServices().clearTableRegionCache(htableName); + + // add a new child span as this one failed + child.addTimelineAnnotation(msg); + child.stop(); + child = Tracing.child(span,"Failed batch, attempting retry"); + + continue; + } + e = inferredE; + } + // Throw to client an exception that indicates the statements that + // were not committed successfully. + sqlE = new CommitException(e, getUncommittedStatementIndexes()); + } finally { + try { + if (cache != null) { + cache.close(); + } + } finally { + try { + hTable.close(); + } + catch (IOException e) { + if (sqlE != null) { + sqlE.setNextException(ServerUtil.parseServerException(e)); + } else { + sqlE = ServerUtil.parseServerException(e); + } + } + if (sqlE != null) { + throw sqlE; + } + } + } + } while (shouldRetry && retryCount++ < 1); + isDataTable = false; + } + if (tableRef.getTable().getType() != PTableType.INDEX) { + numRows -= valuesMap.size(); + } + // For transactions, track the statement indexes as we send data + // over because our CommitException should include all statements + // involved in the transaction since none of them would have been + // committed in the event of a failure. + if (isTransactional) { + addUncommittedStatementIndexes(valuesMap.values()); + if (txMutations == null) { + txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); + } + // Keep all mutations we've encountered until a commit or rollback. + // This is not ideal, but there's not good way to get the values back + // in the event that we need to replay the commit. + txMutations.put(tableRef, valuesMap); + } // Remove batches as we process them - if (sendAll) { - // Iterating through map key set in this case, so we cannot use - // the remove method without getting a concurrent modification - // exception. - tableRefIterator.remove(); - } else { - mutations.remove(tableRef); - } - } + if (sendAll) { + // Iterating through map key set in this case, so we cannot use + // the remove method without getting a concurrent modification + // exception. + tableRefIterator.remove(); + } else { + mutations.remove(tableRef); + } + } } } @@ -1006,7 +1010,7 @@ public class MutationState implements SQLCloseable { } public static Transaction decodeTransaction(byte[] txnBytes) throws IOException { - return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes); + return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes); } private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations, @@ -1059,10 +1063,10 @@ public class MutationState implements SQLCloseable { } private int[] getUncommittedStatementIndexes() { - for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) { - addUncommittedStatementIndexes(rowMutationMap.values()); - } - return uncommittedStatementIndexes; + for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) { + addUncommittedStatementIndexes(rowMutationMap.values()); + } + return uncommittedStatementIndexes; } @Override @@ -1101,9 +1105,9 @@ public class MutationState implements SQLCloseable { Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); int retryCount = 0; do { - boolean sendSuccessful=false; - boolean retryCommit = false; - SQLException sqlE = null; + boolean sendSuccessful=false; + boolean retryCommit = false; + SQLException sqlE = null; try { send(); txMutations = this.txMutations; @@ -1121,7 +1125,8 @@ public class MutationState implements SQLCloseable { finishSuccessful = true; } } catch (TransactionFailureException e) { - retryCommit = (e instanceof TransactionConflictException && retryCount == 0); + if (logger.isInfoEnabled()) logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount); + retryCommit = (e instanceof TransactionConflictException && retryCount < MAX_COMMIT_RETRIES); txFailure = e; SQLException nextE = TransactionUtil.getTransactionFailureException(e); if (sqlE == null) { @@ -1134,7 +1139,9 @@ public class MutationState implements SQLCloseable { if (!finishSuccessful) { try { txContext.abort(txFailure); + if (logger.isInfoEnabled()) logger.info("Abort successful"); } catch (TransactionFailureException e) { + if (logger.isInfoEnabled()) logger.info("Abort failed with " + e); SQLException nextE = TransactionUtil.getTransactionFailureException(e); if (sqlE == null) { sqlE = nextE; @@ -1151,8 +1158,15 @@ public class MutationState implements SQLCloseable { } finally { if (retryCommit) { startTransaction(); + // Add back read fences + Set<TableRef> txTableRefs = txMutations.keySet(); + for (TableRef tableRef : txTableRefs) { + PTable dataTable = tableRef.getTable(); + addReadFence(dataTable); + } try { - retryCommit = wasIndexAdded(txMutations.keySet()); + // Only retry if an index was added + retryCommit = wasIndexAdded(txTableRefs); } catch (SQLException e) { retryCommit = false; if (sqlE == null) { @@ -1173,7 +1187,9 @@ public class MutationState implements SQLCloseable { break; } retryCount++; - mutations.putAll(txMutations); + if (txMutations != null) { + mutations.putAll(txMutations); + } } while (true); } @@ -1183,6 +1199,7 @@ public class MutationState implements SQLCloseable { * @throws SQLException */ private boolean wasIndexAdded(Set<TableRef> txTableRefs) throws SQLException { + if (logger.isInfoEnabled()) logger.info("Checking for index updates as of " + getInitialWritePointer()); MetaDataClient client = new MetaDataClient(connection); PMetaData cache = connection.getMetaDataCache(); boolean addedIndexes = false; @@ -1198,6 +1215,7 @@ public class MutationState implements SQLCloseable { throw new TableNotFoundException(dataTable.getSchemaName().getString(), dataTable.getTableName().getString()); } if (!result.wasUpdated()) { + if (logger.isInfoEnabled()) logger.info("No updates to " + dataTable.getName().getString() + " as of " + timestamp); continue; } if (!addedIndexes) { @@ -1205,8 +1223,10 @@ public class MutationState implements SQLCloseable { // that an index was dropped and recreated with the same name but different // indexed/covered columns. addedIndexes = (!oldIndexes.equals(result.getTable().getIndexes())); + if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates ") + "as of " + timestamp + " to " + dataTable.getName().getString() + " with indexes " + dataTable.getIndexes()); } } + if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates ") + "to indexes as of " + getInitialWritePointer()); return addedIndexes; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/13699371/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index ee212ed..d134f08 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -479,8 +479,8 @@ public class MetaDataClient { // Do not make rpc to getTable if // 1. table is a system table // 2. table was already resolved as of that timestamp - if (table != null && !alwaysHitServer - && (systemTable || resolvedTimestamp == tableResolvedTimestamp || connection.getMetaDataCache().getAge(tableRef) < table.getUpdateCacheFrequency())) { + if (table != null && !alwaysHitServer + && (systemTable || resolvedTimestamp == tableResolvedTimestamp || connection.getMetaDataCache().getAge(tableRef) < table.getUpdateCacheFrequency())) { return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, QueryConstants.UNSET_TIMESTAMP, table); } @@ -1383,6 +1383,7 @@ public class MetaDataClient { return new MutationState(0,connection); } + if (logger.isInfoEnabled()) logger.info("Created index " + table.getName().getString() + " at " + table.getTimeStamp()); // In async process, we return immediately as the MR job needs to be triggered . if(statement.isAsync()) { return new MutationState(0, connection); @@ -2887,19 +2888,19 @@ public class MetaDataClient { String fullTableName = SchemaUtil.getTableName(schemaName, tableName); long resolvedTimeStamp = TransactionUtil.getResolvedTime(connection, result); if (table.getIndexes().isEmpty() || (numPkColumnsAdded==0 && !nonTxToTx)) { - connection.addColumn( - tenantId, - fullTableName, - columns, - result.getMutationTime(), - seqNum, - isImmutableRows == null ? table.isImmutableRows() : isImmutableRows, - disableWAL == null ? table.isWALDisabled() : disableWAL, - multiTenant == null ? table.isMultiTenant() : multiTenant, - storeNulls == null ? table.getStoreNulls() : storeNulls, - isTransactional == null ? table.isTransactional() : isTransactional, - updateCacheFrequency == null ? table.getUpdateCacheFrequency() : updateCacheFrequency, - resolvedTimeStamp); + connection.addColumn( + tenantId, + fullTableName, + columns, + result.getMutationTime(), + seqNum, + isImmutableRows == null ? table.isImmutableRows() : isImmutableRows, + disableWAL == null ? table.isWALDisabled() : disableWAL, + multiTenant == null ? table.isMultiTenant() : multiTenant, + storeNulls == null ? table.getStoreNulls() : storeNulls, + isTransactional == null ? table.isTransactional() : isTransactional, + updateCacheFrequency == null ? table.getUpdateCacheFrequency() : updateCacheFrequency, + resolvedTimeStamp); } else if (updateCacheFrequency != null) { // Force removal from cache as the update cache frequency has changed // Note that clients outside this JVM won't be affected.