Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/129#discussion_r45307398
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---
@@ -410,149 +642,293 @@ private static long
calculateMutationSize(List<Mutation> mutations) {
return byteSize;
}
+ private boolean hasKeyValueColumn(PTable table, PTable index) {
+ IndexMaintainer maintainer = index.getIndexMaintainer(table,
connection);
+ return !maintainer.getAllColumns().isEmpty();
+ }
+
+ private void divideImmutableIndexes(Iterator<PTable>
enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable>
keyValueIndexes) {
+ while (enabledImmutableIndexes.hasNext()) {
+ PTable index = enabledImmutableIndexes.next();
+ if (index.getIndexType() != IndexType.LOCAL) {
+ if (hasKeyValueColumn(table, index)) {
+ keyValueIndexes.add(index);
+ } else {
+ rowKeyIndexes.add(index);
+ }
+ }
+ }
+ }
+ private class MetaDataAwareHTable extends DelegateHTableInterface {
+ private final TableRef tableRef;
+
+ private MetaDataAwareHTable(HTableInterface delegate, TableRef
tableRef) {
+ super(delegate);
+ this.tableRef = tableRef;
+ }
+
+ /**
+ * Called by Tephra when a transaction is aborted. We have this
wrapper so that we get an
+ * opportunity to attach our index meta data to the mutations such
that we can also undo
+ * the index mutations.
+ */
+ @Override
+ public void delete(List<Delete> deletes) throws IOException {
+ try {
+ PTable table = tableRef.getTable();
+ List<PTable> indexes = table.getIndexes();
+ Iterator<PTable> enabledIndexes =
IndexMaintainer.nonDisabledIndexIterator(indexes.iterator());
+ if (enabledIndexes.hasNext()) {
+ List<PTable> keyValueIndexes = Collections.emptyList();
+ ImmutableBytesWritable indexMetaDataPtr = new
ImmutableBytesWritable();
+ boolean attachMetaData =
table.getIndexMaintainers(indexMetaDataPtr, connection);
+ if (table.isImmutableRows()) {
+ List<PTable> rowKeyIndexes =
Lists.newArrayListWithExpectedSize(indexes.size());
+ keyValueIndexes =
Lists.newArrayListWithExpectedSize(indexes.size());
+ divideImmutableIndexes(enabledIndexes, table,
rowKeyIndexes, keyValueIndexes);
+ // Generate index deletes for immutable indexes
that only reference row key
+ // columns and submit directly here.
+ ImmutableBytesWritable ptr = new
ImmutableBytesWritable();
+ for (PTable index : rowKeyIndexes) {
+ List<Delete> indexDeletes =
IndexUtil.generateDeleteIndexData(table, index, deletes, ptr,
connection.getKeyValueBuilder(), connection);
+ HTableInterface hindex =
connection.getQueryServices().getTable(index.getPhysicalName().getBytes());
+ hindex.delete(indexDeletes);
+ }
+ }
+
+ // If we have mutable indexes, local immutable
indexes, or global immutable indexes
+ // that reference key value columns, setup index meta
data and attach here. In this
+ // case updates to the indexes will be generated on
the server side.
+ // An alternative would be to let Tephra track the row
keys for the immutable index
+ // by adding it as a transaction participant (soon we
can prevent any conflict
+ // detection from occurring) with the downside being
the additional memory required.
+ if (!keyValueIndexes.isEmpty()) {
+ attachMetaData = true;
+ IndexMaintainer.serializeAdditional(table,
indexMetaDataPtr, keyValueIndexes, connection);
+ }
+ if (attachMetaData) {
+ setMetaDataOnMutations(tableRef, deletes,
indexMetaDataPtr);
+ }
+ }
+ delegate.delete(deletes);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
@SuppressWarnings("deprecation")
- public void commit() throws SQLException {
+ private void send(Iterator<TableRef> tableRefIterator) throws
SQLException {
int i = 0;
- PName tenantId = connection.getTenantId();
- long[] serverTimeStamps = validate();
- Iterator<Map.Entry<TableRef,
Map<ImmutableBytesPtr,RowMutationState>>> iterator =
this.mutations.entrySet().iterator();
+ long[] serverTimeStamps = null;
+ boolean sendAll = false;
+ // Validate up front if not transactional so that we
+ if (tableRefIterator == null) {
+ serverTimeStamps = validateAll();
+ tableRefIterator = mutations.keySet().iterator();
+ sendAll = true;
+ }
+
// add tracing for this operation
try (TraceScope trace = Tracing.startNewSpan(connection,
"Committing mutations to tables")) {
Span span = trace.getSpan();
- while (iterator.hasNext()) {
- Map.Entry<TableRef,
Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
- // at this point we are going through mutations for each
table
-
- Map<ImmutableBytesPtr,RowMutationState> valuesMap =
entry.getValue();
- // above is mutations for a table where the first part is
the row key and the second part is column values.
-
- TableRef tableRef = entry.getKey();
- PTable table = tableRef.getTable();
- table.getIndexMaintainers(tempPtr, connection);
- boolean hasIndexMaintainers = tempPtr.getLength() > 0;
- boolean isDataTable = true;
- long serverTimestamp = serverTimeStamps[i++];
- Iterator<Pair<byte[],List<Mutation>>> mutationsIterator =
addRowMutations(tableRef, valuesMap, serverTimestamp, false);
- // above returns an iterator of pair where the first
- while (mutationsIterator.hasNext()) {
- Pair<byte[],List<Mutation>> pair =
mutationsIterator.next();
- byte[] htableName = pair.getFirst();
- List<Mutation> mutations = pair.getSecond();
-
- //create a span per target table
- //TODO maybe we can be smarter about the table name to
string here?
- Span child = Tracing.child(span,"Writing mutation
batch for table: "+Bytes.toString(htableName));
-
- int retryCount = 0;
- boolean shouldRetry = false;
- do {
- ServerCache cache = null;
- if (hasIndexMaintainers && isDataTable) {
- byte[] attribValue = null;
- byte[] uuidValue;
- if
(IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations,
tempPtr.getLength())) {
- IndexMetaDataCacheClient client = new
IndexMetaDataCacheClient(connection, tableRef);
- cache =
client.addIndexMetadataCache(mutations, tempPtr);
- child.addTimelineAnnotation("Updated index
metadata cache");
- uuidValue = cache.getId();
- // If we haven't retried yet, retry for
this case only, as it's possible that
- // a split will occur after we send the
index metadata cache to all known
- // region servers.
- shouldRetry = true;
- } else {
- attribValue =
ByteUtil.copyKeyBytesIfNecessary(tempPtr);
- uuidValue = ServerCacheClient.generateId();
- }
- // Either set the UUID to be able to access
the index metadata from the cache
- // or set the index metadata directly on the
Mutation
- for (Mutation mutation : mutations) {
- if (tenantId != null) {
- byte[] tenantIdBytes =
ScanUtil.getTenantIdBytes(
- table.getRowKeySchema(),
- table.getBucketNum()!=null,
- tenantId);
-
mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
- }
-
mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- if (attribValue != null) {
-
mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
- }
- }
- }
-
- SQLException sqlE = null;
- HTableInterface hTable =
connection.getQueryServices().getTable(htableName);
- try {
- long numMutations = mutations.size();
+ ImmutableBytesWritable indexMetaDataPtr = new
ImmutableBytesWritable();
+ while (tableRefIterator.hasNext()) {
+ // at this point we are going through mutations for
each table
+ TableRef tableRef = tableRefIterator.next();
+ Map<ImmutableBytesPtr, RowMutationState> valuesMap =
mutations.get(tableRef);
+ if (valuesMap == null || valuesMap.isEmpty()) {
+ continue;
+ }
+ PTable table = tableRef.getTable();
+ // Track tables to which we've sent uncommitted data
+ if (table.isTransactional()) {
+
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
+ }
+ table.getIndexMaintainers(indexMetaDataPtr, connection);
+ boolean isDataTable = true;
+ // Validate as we go if transactional since we can undo if
a problem occurs (which is unlikely)
+ long serverTimestamp = serverTimeStamps == null ?
validate(tableRef, valuesMap) : serverTimeStamps[i++];
+ Iterator<Pair<byte[],List<Mutation>>> mutationsIterator =
addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
+ while (mutationsIterator.hasNext()) {
+ Pair<byte[],List<Mutation>> pair =
mutationsIterator.next();
+ byte[] htableName = pair.getFirst();
+ List<Mutation> mutationList = pair.getSecond();
+
+ //create a span per target table
+ //TODO maybe we can be smarter about the table name to
string here?
+ Span child = Tracing.child(span,"Writing mutation batch
for table: "+Bytes.toString(htableName));
+
+ int retryCount = 0;
+ boolean shouldRetry = false;
+ do {
+ ServerCache cache = null;
+ if (isDataTable) {
+ cache = setMetaDataOnMutations(tableRef,
mutationList, indexMetaDataPtr);
+ }
+
+ // If we haven't retried yet, retry for this case
only, as it's possible that
+ // a split will occur after we send the index
metadata cache to all known
+ // region servers.
+ shouldRetry = cache != null;
+ SQLException sqlE = null;
+ HTableInterface hTable =
connection.getQueryServices().getTable(htableName);
+ try {
+ if (table.isTransactional()) {
+ // If we have indexes, wrap the HTable in a
delegate HTable that
+ // will attach the necessary index meta
data in the event of a
+ // rollback
+ if (!table.getIndexes().isEmpty()) {
+ hTable = new
MetaDataAwareHTable(hTable, tableRef);
+ }
+ TransactionAwareHTable txnAware =
TransactionUtil.getTransactionAwareHTable(hTable, table);
+ // Don't add immutable indexes (those are
the only ones that would participate
+ // during a commit), as we don't need
conflict detection for these.
+ if (isDataTable) {
+ // Even for immutable, we need to do
this so that an abort has the state
+ // necessary to generate the rows to
delete.
+ addTransactionParticipant(txnAware);
+ } else {
+ txnAware.startTx(getTransaction());
+ }
+ hTable = txnAware;
+ }
+ long numMutations = mutationList.size();
GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
long startTime = System.currentTimeMillis();
- child.addTimelineAnnotation("Attempt " +
retryCount);
- hTable.batch(mutations);
- child.stop();
+ child.addTimelineAnnotation("Attempt " +
retryCount);;
+ hTable.batch(mutationList);
+ child.stop();
+ child.stop();
shouldRetry = false;
long mutationCommitTime =
System.currentTimeMillis() - startTime;
GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
- long mutationSizeBytes =
calculateMutationSize(mutations);
+ long mutationSizeBytes =
calculateMutationSize(mutationList);
MutationMetric mutationsMetric = new
MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime);
mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName),
mutationsMetric);
- } catch (Exception e) {
- SQLException inferredE =
ServerUtil.parseServerExceptionOrNull(e);
- if (inferredE != null) {
- if (shouldRetry && retryCount == 0 &&
inferredE.getErrorCode() ==
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
- // Swallow this exception once, as
it's possible that we split after sending the index metadata
- // and one of the region servers
doesn't have it. This will cause it to have it the next go around.
- // If it fails again, we don't retry.
- String msg = "Swallowing exception and
retrying after clearing meta cache on connection. " + inferredE;
-
logger.warn(LogUtil.addCustomAnnotations(msg, connection));
-
connection.getQueryServices().clearTableRegionCache(htableName);
+ } catch (Exception e) {
+ SQLException inferredE =
ServerUtil.parseServerExceptionOrNull(e);
+ if (inferredE != null) {
+ if (shouldRetry && retryCount == 0 &&
inferredE.getErrorCode() ==
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
+ // Swallow this exception once, as it's
possible that we split after sending the index metadata
+ // and one of the region servers
doesn't have it. This will cause it to have it the next go around.
+ // If it fails again, we don't retry.
+ String msg = "Swallowing exception and
retrying after clearing meta cache on connection. " + inferredE;
+
logger.warn(LogUtil.addCustomAnnotations(msg, connection));
+
connection.getQueryServices().clearTableRegionCache(htableName);
+
+ // add a new child span as this one
failed
+ child.addTimelineAnnotation(msg);
+ child.stop();
+ child = Tracing.child(span,"Failed
batch, attempting retry");
+
+ continue;
+ }
+ e = inferredE;
+ }
+ // Throw to client with both what was committed
so far and what is left to be committed.
+ // That way, client can either undo what was
done or try again with what was not done.
+ sqlE = new CommitException(e,
getUncommittedStatementIndexes());
+ } finally {
+ try {
+ if (cache != null) {
+ cache.close();
+ }
+ } finally {
+ try {
+ hTable.close();
+ }
+ catch (IOException e) {
+ if (sqlE != null) {
+
sqlE.setNextException(ServerUtil.parseServerException(e));
+ } else {
+ sqlE =
ServerUtil.parseServerException(e);
+ }
+ }
+ if (sqlE != null) {
+ // clear pending mutations
+ mutations.clear();
+ throw sqlE;
+ }
+ }
+ }
+ } while (shouldRetry && retryCount++ < 1);
+ isDataTable = false;
+ }
+ if (tableRef.getTable().getType() != PTableType.INDEX) {
+ numRows -= valuesMap.size();
+ }
+ // Remove batches as we process them
+ if (sendAll) {
+ tableRefIterator.remove(); // Iterating through actual
map in this case
+ } else {
+ mutations.remove(tableRef);
+ }
+ }
+ }
+ // Note that we cannot assume that *all* mutations have been sent,
since we've optimized this
+ // now to only send the mutations for the tables we're querying,
hence we've removed the
+ // assertions that we're here before.
+ }
- // add a new child span as this one
failed
- child.addTimelineAnnotation(msg);
- child.stop();
- child = Tracing.child(span,"Failed
batch, attempting retry");
+ public byte[] encodeTransaction() throws SQLException {
+ try {
+ return CODEC.encode(getTransaction());
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
+ }
+
+ public static Transaction decodeTransaction(byte[] txnBytes) throws
IOException {
+ return (txnBytes == null || txnBytes.length==0) ? null :
CODEC.decode(txnBytes);
+ }
- continue;
- }
- e = inferredE;
- }
- sqlE = new CommitException(e,
getUncommittedStatementIndexes());
- } finally {
- try {
- hTable.close();
- } catch (IOException e) {
- if (sqlE != null) {
-
sqlE.setNextException(ServerUtil.parseServerException(e));
- } else {
- sqlE =
ServerUtil.parseServerException(e);
- }
- } finally {
- try {
- if (cache != null) {
- cache.close();
- }
- } finally {
- if (sqlE != null) {
- throw sqlE;
- }
- }
- }
- }
- } while (shouldRetry && retryCount++ < 1);
- isDataTable = false;
- }
- if (tableRef.getTable().getType() != PTableType.INDEX) {
- numRows -= entry.getValue().size();
+ private ServerCache setMetaDataOnMutations(TableRef tableRef, List<?
extends Mutation> mutations,
+ ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
+ PTable table = tableRef.getTable();
+ byte[] tenantId = connection.getTenantId() == null ? null :
connection.getTenantId().getBytes();
+ ServerCache cache = null;
+ byte[] attribValue = null;
+ byte[] uuidValue = null;
+ byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
+ if (table.isTransactional()) {
+ txState = encodeTransaction();
+ }
+ boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
+ if (hasIndexMetaData) {
+ if (IndexMetaDataCacheClient.useIndexMetadataCache(connection,
mutations, indexMetaDataPtr.getLength() + txState.length)) {
+ IndexMetaDataCacheClient client = new
IndexMetaDataCacheClient(connection, tableRef);
+ cache = client.addIndexMetadataCache(mutations,
indexMetaDataPtr, txState);
+ uuidValue = cache.getId();
+ } else {
+ attribValue =
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+ uuidValue = ServerCacheClient.generateId();
+ }
+ } else if (txState.length == 0) {
+ return null;
+ }
+ // Either set the UUID to be able to access the index metadata
from the cache
+ // or set the index metadata directly on the Mutation
+ for (Mutation mutation : mutations) {
+ if (tenantId != null) {
+ mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB,
tenantId);
+ }
+ mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ if (attribValue != null) {
+ mutation.setAttribute(PhoenixIndexCodec.INDEX_MD,
attribValue);
+ if (txState.length > 0) {
+
mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
}
- iterator.remove(); // Remove batches as we process them
+ } else if (!hasIndexMetaData && txState.length > 0) {
+ mutation.setAttribute(BaseScannerRegionObserver.TX_STATE,
txState);
}
}
- assert(numRows==0);
- assert(this.mutations.isEmpty());
+ return cache;
}
- public void rollback(PhoenixConnection connection) throws SQLException
{
+ public void clear() throws SQLException {
--- End diff --
Make this clear() method private if possible.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---