Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/129#discussion_r45307321
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---
@@ -301,28 +476,81 @@ public void remove() {
};
}
+
+ private void generateMutations(final TableRef tableRef, long timestamp,
+ final Map<ImmutableBytesPtr, RowMutationState> values,
+ final List<Mutation> mutationList,
+ final List<Mutation> mutationsPertainingToIndex) {
+ final PTable table = tableRef.getTable();
+ boolean tableWithRowTimestampCol = table.getRowTimestampColPos()
!= -1;
+ Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>>
iterator = values.entrySet().iterator();
+ long timestampToUse = timestamp;
+ while (iterator.hasNext()) {
+ Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry =
iterator.next();
+ ImmutableBytesPtr key = rowEntry.getKey();
+ RowMutationState state = rowEntry.getValue();
+ if (tableWithRowTimestampCol) {
+ RowTimestampColInfo rowTsColInfo =
state.getRowTimestampColInfo();
+ if (rowTsColInfo.useServerTimestamp()) {
+ // regenerate the key with this timestamp.
+ key = getNewRowKeyWithRowTimestamp(key,
timestampToUse, table);
+ } else {
+ if (rowTsColInfo.getTimestamp() != null) {
+ timestampToUse = rowTsColInfo.getTimestamp();
+ }
+ }
+ }
+ PRow row =
tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestampToUse,
key);
+ List<Mutation> rowMutations,
rowMutationsPertainingToIndex;
+ if (rowEntry.getValue().getColumnValues() ==
PRow.DELETE_MARKER) { // means delete
+ row.delete();
+ rowMutations = row.toRowMutations();
+ // Row deletes for index tables are processed by
running a re-written query
+ // against the index table (as this allows for
flexibility in being able to
+ // delete rows).
+ rowMutationsPertainingToIndex =
Collections.emptyList();
+ } else {
+ for (Map.Entry<PColumn,byte[]> valueEntry :
rowEntry.getValue().getColumnValues().entrySet()) {
+ row.setValue(valueEntry.getKey(),
valueEntry.getValue());
+ }
+ rowMutations = row.toRowMutations();
+ rowMutationsPertainingToIndex = rowMutations;
+ }
+ mutationList.addAll(rowMutations);
+ if (mutationsPertainingToIndex != null)
mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex);
+ }
+ }
/**
* Get the unsorted list of HBase mutations for the tables with
uncommitted data.
* @return list of HBase mutations for uncommitted data.
*/
+ public Iterator<Pair<byte[],List<Mutation>>> toMutations(Long
timestamp) {
+ return toMutations(false, timestamp);
+ }
+
public Iterator<Pair<byte[],List<Mutation>>> toMutations() {
- return toMutations(false);
+ return toMutations(false, null);
}
public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean
includeMutableIndexes) {
+ return toMutations(includeMutableIndexes, null);
+ }
+
+ public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean
includeMutableIndexes, final Long tableTimestamp) {
final Iterator<Map.Entry<TableRef,
Map<ImmutableBytesPtr,RowMutationState>>> iterator =
this.mutations.entrySet().iterator();
if (!iterator.hasNext()) {
return Iterators.emptyIterator();
}
Long scn = connection.getSCN();
- final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP :
scn;
+ final long timestamp = (tableTimestamp!=null &&
tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null
? HConstants.LATEST_TIMESTAMP : scn);
+// final long timestamp = (scn == null ?
HConstants.LATEST_TIMESTAMP : scn);
--- End diff --
Remove commented out code
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---