[
https://issues.apache.org/jira/browse/PHOENIX-1674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15013017#comment-15013017
]
ASF GitHub Bot commented on PHOENIX-1674:
-----------------------------------------
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/129#discussion_r45307398
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---
@@ -410,149 +642,293 @@ private static long
calculateMutationSize(List<Mutation> mutations) {
return byteSize;
}
+ private boolean hasKeyValueColumn(PTable table, PTable index) {
+ IndexMaintainer maintainer = index.getIndexMaintainer(table,
connection);
+ return !maintainer.getAllColumns().isEmpty();
+ }
+
+ private void divideImmutableIndexes(Iterator<PTable>
enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable>
keyValueIndexes) {
+ while (enabledImmutableIndexes.hasNext()) {
+ PTable index = enabledImmutableIndexes.next();
+ if (index.getIndexType() != IndexType.LOCAL) {
+ if (hasKeyValueColumn(table, index)) {
+ keyValueIndexes.add(index);
+ } else {
+ rowKeyIndexes.add(index);
+ }
+ }
+ }
+ }
+ private class MetaDataAwareHTable extends DelegateHTableInterface {
+ private final TableRef tableRef;
+
+ private MetaDataAwareHTable(HTableInterface delegate, TableRef
tableRef) {
+ super(delegate);
+ this.tableRef = tableRef;
+ }
+
+ /**
+ * Called by Tephra when a transaction is aborted. We have this
wrapper so that we get an
+ * opportunity to attach our index meta data to the mutations such
that we can also undo
+ * the index mutations.
+ */
+ @Override
+ public void delete(List<Delete> deletes) throws IOException {
+ try {
+ PTable table = tableRef.getTable();
+ List<PTable> indexes = table.getIndexes();
+ Iterator<PTable> enabledIndexes =
IndexMaintainer.nonDisabledIndexIterator(indexes.iterator());
+ if (enabledIndexes.hasNext()) {
+ List<PTable> keyValueIndexes = Collections.emptyList();
+ ImmutableBytesWritable indexMetaDataPtr = new
ImmutableBytesWritable();
+ boolean attachMetaData =
table.getIndexMaintainers(indexMetaDataPtr, connection);
+ if (table.isImmutableRows()) {
+ List<PTable> rowKeyIndexes =
Lists.newArrayListWithExpectedSize(indexes.size());
+ keyValueIndexes =
Lists.newArrayListWithExpectedSize(indexes.size());
+ divideImmutableIndexes(enabledIndexes, table,
rowKeyIndexes, keyValueIndexes);
+ // Generate index deletes for immutable indexes
that only reference row key
+ // columns and submit directly here.
+ ImmutableBytesWritable ptr = new
ImmutableBytesWritable();
+ for (PTable index : rowKeyIndexes) {
+ List<Delete> indexDeletes =
IndexUtil.generateDeleteIndexData(table, index, deletes, ptr,
connection.getKeyValueBuilder(), connection);
+ HTableInterface hindex =
connection.getQueryServices().getTable(index.getPhysicalName().getBytes());
+ hindex.delete(indexDeletes);
+ }
+ }
+
+ // If we have mutable indexes, local immutable
indexes, or global immutable indexes
+ // that reference key value columns, setup index meta
data and attach here. In this
+ // case updates to the indexes will be generated on
the server side.
+ // An alternative would be to let Tephra track the row
keys for the immutable index
+ // by adding it as a transaction participant (soon we
can prevent any conflict
+ // detection from occurring) with the downside being
the additional memory required.
+ if (!keyValueIndexes.isEmpty()) {
+ attachMetaData = true;
+ IndexMaintainer.serializeAdditional(table,
indexMetaDataPtr, keyValueIndexes, connection);
+ }
+ if (attachMetaData) {
+ setMetaDataOnMutations(tableRef, deletes,
indexMetaDataPtr);
+ }
+ }
+ delegate.delete(deletes);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
@SuppressWarnings("deprecation")
- public void commit() throws SQLException {
+ private void send(Iterator<TableRef> tableRefIterator) throws
SQLException {
int i = 0;
- PName tenantId = connection.getTenantId();
- long[] serverTimeStamps = validate();
- Iterator<Map.Entry<TableRef,
Map<ImmutableBytesPtr,RowMutationState>>> iterator =
this.mutations.entrySet().iterator();
+ long[] serverTimeStamps = null;
+ boolean sendAll = false;
+ // Validate up front if not transactional so that we
+ if (tableRefIterator == null) {
+ serverTimeStamps = validateAll();
+ tableRefIterator = mutations.keySet().iterator();
+ sendAll = true;
+ }
+
// add tracing for this operation
try (TraceScope trace = Tracing.startNewSpan(connection,
"Committing mutations to tables")) {
Span span = trace.getSpan();
- while (iterator.hasNext()) {
- Map.Entry<TableRef,
Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
- // at this point we are going through mutations for each
table
-
- Map<ImmutableBytesPtr,RowMutationState> valuesMap =
entry.getValue();
- // above is mutations for a table where the first part is
the row key and the second part is column values.
-
- TableRef tableRef = entry.getKey();
- PTable table = tableRef.getTable();
- table.getIndexMaintainers(tempPtr, connection);
- boolean hasIndexMaintainers = tempPtr.getLength() > 0;
- boolean isDataTable = true;
- long serverTimestamp = serverTimeStamps[i++];
- Iterator<Pair<byte[],List<Mutation>>> mutationsIterator =
addRowMutations(tableRef, valuesMap, serverTimestamp, false);
- // above returns an iterator of pair where the first
- while (mutationsIterator.hasNext()) {
- Pair<byte[],List<Mutation>> pair =
mutationsIterator.next();
- byte[] htableName = pair.getFirst();
- List<Mutation> mutations = pair.getSecond();
-
- //create a span per target table
- //TODO maybe we can be smarter about the table name to
string here?
- Span child = Tracing.child(span,"Writing mutation
batch for table: "+Bytes.toString(htableName));
-
- int retryCount = 0;
- boolean shouldRetry = false;
- do {
- ServerCache cache = null;
- if (hasIndexMaintainers && isDataTable) {
- byte[] attribValue = null;
- byte[] uuidValue;
- if
(IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations,
tempPtr.getLength())) {
- IndexMetaDataCacheClient client = new
IndexMetaDataCacheClient(connection, tableRef);
- cache =
client.addIndexMetadataCache(mutations, tempPtr);
- child.addTimelineAnnotation("Updated index
metadata cache");
- uuidValue = cache.getId();
- // If we haven't retried yet, retry for
this case only, as it's possible that
- // a split will occur after we send the
index metadata cache to all known
- // region servers.
- shouldRetry = true;
- } else {
- attribValue =
ByteUtil.copyKeyBytesIfNecessary(tempPtr);
- uuidValue = ServerCacheClient.generateId();
- }
- // Either set the UUID to be able to access
the index metadata from the cache
- // or set the index metadata directly on the
Mutation
- for (Mutation mutation : mutations) {
- if (tenantId != null) {
- byte[] tenantIdBytes =
ScanUtil.getTenantIdBytes(
- table.getRowKeySchema(),
- table.getBucketNum()!=null,
- tenantId);
-
mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
- }
-
mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- if (attribValue != null) {
-
mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
- }
- }
- }
-
- SQLException sqlE = null;
- HTableInterface hTable =
connection.getQueryServices().getTable(htableName);
- try {
- long numMutations = mutations.size();
+ ImmutableBytesWritable indexMetaDataPtr = new
ImmutableBytesWritable();
+ while (tableRefIterator.hasNext()) {
+ // at this point we are going through mutations for
each table
+ TableRef tableRef = tableRefIterator.next();
+ Map<ImmutableBytesPtr, RowMutationState> valuesMap =
mutations.get(tableRef);
+ if (valuesMap == null || valuesMap.isEmpty()) {
+ continue;
+ }
+ PTable table = tableRef.getTable();
+ // Track tables to which we've sent uncommitted data
+ if (table.isTransactional()) {
+
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
+ }
+ table.getIndexMaintainers(indexMetaDataPtr, connection);
+ boolean isDataTable = true;
+ // Validate as we go if transactional since we can undo if
a problem occurs (which is unlikely)
+ long serverTimestamp = serverTimeStamps == null ?
validate(tableRef, valuesMap) : serverTimeStamps[i++];
+ Iterator<Pair<byte[],List<Mutation>>> mutationsIterator =
addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
+ while (mutationsIterator.hasNext()) {
+ Pair<byte[],List<Mutation>> pair =
mutationsIterator.next();
+ byte[] htableName = pair.getFirst();
+ List<Mutation> mutationList = pair.getSecond();
+
+ //create a span per target table
+ //TODO maybe we can be smarter about the table name to
string here?
+ Span child = Tracing.child(span,"Writing mutation batch
for table: "+Bytes.toString(htableName));
+
+ int retryCount = 0;
+ boolean shouldRetry = false;
+ do {
+ ServerCache cache = null;
+ if (isDataTable) {
+ cache = setMetaDataOnMutations(tableRef,
mutationList, indexMetaDataPtr);
+ }
+
+ // If we haven't retried yet, retry for this case
only, as it's possible that
+ // a split will occur after we send the index
metadata cache to all known
+ // region servers.
+ shouldRetry = cache != null;
+ SQLException sqlE = null;
+ HTableInterface hTable =
connection.getQueryServices().getTable(htableName);
+ try {
+ if (table.isTransactional()) {
+ // If we have indexes, wrap the HTable in a
delegate HTable that
+ // will attach the necessary index meta
data in the event of a
+ // rollback
+ if (!table.getIndexes().isEmpty()) {
+ hTable = new
MetaDataAwareHTable(hTable, tableRef);
+ }
+ TransactionAwareHTable txnAware =
TransactionUtil.getTransactionAwareHTable(hTable, table);
+ // Don't add immutable indexes (those are
the only ones that would participate
+ // during a commit), as we don't need
conflict detection for these.
+ if (isDataTable) {
+ // Even for immutable, we need to do
this so that an abort has the state
+ // necessary to generate the rows to
delete.
+ addTransactionParticipant(txnAware);
+ } else {
+ txnAware.startTx(getTransaction());
+ }
+ hTable = txnAware;
+ }
+ long numMutations = mutationList.size();
GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
long startTime = System.currentTimeMillis();
- child.addTimelineAnnotation("Attempt " +
retryCount);
- hTable.batch(mutations);
- child.stop();
+ child.addTimelineAnnotation("Attempt " +
retryCount);;
+ hTable.batch(mutationList);
+ child.stop();
+ child.stop();
shouldRetry = false;
long mutationCommitTime =
System.currentTimeMillis() - startTime;
GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
- long mutationSizeBytes =
calculateMutationSize(mutations);
+ long mutationSizeBytes =
calculateMutationSize(mutationList);
MutationMetric mutationsMetric = new
MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime);
mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName),
mutationsMetric);
- } catch (Exception e) {
- SQLException inferredE =
ServerUtil.parseServerExceptionOrNull(e);
- if (inferredE != null) {
- if (shouldRetry && retryCount == 0 &&
inferredE.getErrorCode() ==
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
- // Swallow this exception once, as
it's possible that we split after sending the index metadata
- // and one of the region servers
doesn't have it. This will cause it to have it the next go around.
- // If it fails again, we don't retry.
- String msg = "Swallowing exception and
retrying after clearing meta cache on connection. " + inferredE;
-
logger.warn(LogUtil.addCustomAnnotations(msg, connection));
-
connection.getQueryServices().clearTableRegionCache(htableName);
+ } catch (Exception e) {
+ SQLException inferredE =
ServerUtil.parseServerExceptionOrNull(e);
+ if (inferredE != null) {
+ if (shouldRetry && retryCount == 0 &&
inferredE.getErrorCode() ==
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
+ // Swallow this exception once, as it's
possible that we split after sending the index metadata
+ // and one of the region servers
doesn't have it. This will cause it to have it the next go around.
+ // If it fails again, we don't retry.
+ String msg = "Swallowing exception and
retrying after clearing meta cache on connection. " + inferredE;
+
logger.warn(LogUtil.addCustomAnnotations(msg, connection));
+
connection.getQueryServices().clearTableRegionCache(htableName);
+
+ // add a new child span as this one
failed
+ child.addTimelineAnnotation(msg);
+ child.stop();
+ child = Tracing.child(span,"Failed
batch, attempting retry");
+
+ continue;
+ }
+ e = inferredE;
+ }
+ // Throw to client with both what was committed
so far and what is left to be committed.
+ // That way, client can either undo what was
done or try again with what was not done.
+ sqlE = new CommitException(e,
getUncommittedStatementIndexes());
+ } finally {
+ try {
+ if (cache != null) {
+ cache.close();
+ }
+ } finally {
+ try {
+ hTable.close();
+ }
+ catch (IOException e) {
+ if (sqlE != null) {
+
sqlE.setNextException(ServerUtil.parseServerException(e));
+ } else {
+ sqlE =
ServerUtil.parseServerException(e);
+ }
+ }
+ if (sqlE != null) {
+ // clear pending mutations
+ mutations.clear();
+ throw sqlE;
+ }
+ }
+ }
+ } while (shouldRetry && retryCount++ < 1);
+ isDataTable = false;
+ }
+ if (tableRef.getTable().getType() != PTableType.INDEX) {
+ numRows -= valuesMap.size();
+ }
+ // Remove batches as we process them
+ if (sendAll) {
+ tableRefIterator.remove(); // Iterating through actual
map in this case
+ } else {
+ mutations.remove(tableRef);
+ }
+ }
+ }
+ // Note that we cannot assume that *all* mutations have been sent,
since we've optimized this
+ // now to only send the mutations for the tables we're querying,
hence we've removed the
+ // assertions that we're here before.
+ }
- // add a new child span as this one
failed
- child.addTimelineAnnotation(msg);
- child.stop();
- child = Tracing.child(span,"Failed
batch, attempting retry");
+ public byte[] encodeTransaction() throws SQLException {
+ try {
+ return CODEC.encode(getTransaction());
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
+ }
+
+ public static Transaction decodeTransaction(byte[] txnBytes) throws
IOException {
+ return (txnBytes == null || txnBytes.length==0) ? null :
CODEC.decode(txnBytes);
+ }
- continue;
- }
- e = inferredE;
- }
- sqlE = new CommitException(e,
getUncommittedStatementIndexes());
- } finally {
- try {
- hTable.close();
- } catch (IOException e) {
- if (sqlE != null) {
-
sqlE.setNextException(ServerUtil.parseServerException(e));
- } else {
- sqlE =
ServerUtil.parseServerException(e);
- }
- } finally {
- try {
- if (cache != null) {
- cache.close();
- }
- } finally {
- if (sqlE != null) {
- throw sqlE;
- }
- }
- }
- }
- } while (shouldRetry && retryCount++ < 1);
- isDataTable = false;
- }
- if (tableRef.getTable().getType() != PTableType.INDEX) {
- numRows -= entry.getValue().size();
+ private ServerCache setMetaDataOnMutations(TableRef tableRef, List<?
extends Mutation> mutations,
+ ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
+ PTable table = tableRef.getTable();
+ byte[] tenantId = connection.getTenantId() == null ? null :
connection.getTenantId().getBytes();
+ ServerCache cache = null;
+ byte[] attribValue = null;
+ byte[] uuidValue = null;
+ byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
+ if (table.isTransactional()) {
+ txState = encodeTransaction();
+ }
+ boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
+ if (hasIndexMetaData) {
+ if (IndexMetaDataCacheClient.useIndexMetadataCache(connection,
mutations, indexMetaDataPtr.getLength() + txState.length)) {
+ IndexMetaDataCacheClient client = new
IndexMetaDataCacheClient(connection, tableRef);
+ cache = client.addIndexMetadataCache(mutations,
indexMetaDataPtr, txState);
+ uuidValue = cache.getId();
+ } else {
+ attribValue =
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+ uuidValue = ServerCacheClient.generateId();
+ }
+ } else if (txState.length == 0) {
+ return null;
+ }
+ // Either set the UUID to be able to access the index metadata
from the cache
+ // or set the index metadata directly on the Mutation
+ for (Mutation mutation : mutations) {
+ if (tenantId != null) {
+ mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB,
tenantId);
+ }
+ mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ if (attribValue != null) {
+ mutation.setAttribute(PhoenixIndexCodec.INDEX_MD,
attribValue);
+ if (txState.length > 0) {
+
mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
}
- iterator.remove(); // Remove batches as we process them
+ } else if (!hasIndexMetaData && txState.length > 0) {
+ mutation.setAttribute(BaseScannerRegionObserver.TX_STATE,
txState);
}
}
- assert(numRows==0);
- assert(this.mutations.isEmpty());
+ return cache;
}
- public void rollback(PhoenixConnection connection) throws SQLException
{
+ public void clear() throws SQLException {
--- End diff --
Make this clear() method private if possible.
> Snapshot isolation transaction support through Tephra
> -----------------------------------------------------
>
> Key: PHOENIX-1674
> URL: https://issues.apache.org/jira/browse/PHOENIX-1674
> Project: Phoenix
> Issue Type: Improvement
> Reporter: James Taylor
> Labels: SFDC
>
> Tephra (http://tephra.io/ and https://github.com/caskdata/tephra) is one
> option for getting transaction support in Phoenix. Let's use this JIRA to
> discuss the way in which this could be integrated along with the pros and
> cons.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)