jackye1995 commented on code in PR #5888:
URL: https://github.com/apache/iceberg/pull/5888#discussion_r1131772531
##########
core/src/main/java/org/apache/iceberg/SnapshotProducer.java:
##########
@@ -615,4 +630,49 @@ private static void updateTotal(
}
}
}
+
+ private Long snapshotIdToRollbackToOnConflict(Long parentSnapshotId) {
+ Long newParentSnapshotId = parentSnapshotId;
+ if (shouldRollbackReplaceOnConflict()) {
+ // add a set snapshot op on top of base to roll back to parent snapshot
+ boolean isCommitSuccessfullyApplied = false;
+ // Update parentSnapshot to it's grandParent
+ // provided parentSnapshot is of type replace and never rollback beyond
startingSnapshotId
+ while (newParentSnapshotId != null
+ &&
DataOperations.REPLACE.equals(base.snapshot(newParentSnapshotId).operation())
+ && !newParentSnapshotId.equals(startingSnapshotId())) {
+ // create a tempTableOperation to pass base with rollback to validate
of update
+ TableOperations tempTableOps = ops.temp(base);
+ newParentSnapshotId = base.snapshot(newParentSnapshotId).parentId();
+ if (newParentSnapshotId == null) {
+ return null;
+ }
+ Snapshot parentSnapshot = base.snapshot(newParentSnapshotId);
+
+ SetSnapshotOperation setSnapshotOp = new
SetSnapshotOperation(tempTableOps);
+ setSnapshotOp.rollbackTo(newParentSnapshotId).commit();
+ try {
+ validate(tempTableOps.current(), parentSnapshot);
+ isCommitSuccessfullyApplied = true;
+ } catch (ValidationException validationException) {
+ // swallow the exception for re-trying
+ }
+ }
+
+ if (!isCommitSuccessfullyApplied) {
+ return null;
+ }
+
+ return newParentSnapshotId;
+ } else {
+ return null;
+ }
+ }
+
+ private Boolean shouldRollbackReplaceOnConflict() {
+ return PropertyUtil.propertyAsBoolean(
+ base.properties(),
+ TableProperties.COMMIT_ROLLBACK_REPLACE_ON_CONFLICT_ENABLED,
Review Comment:
Explored a bit more offline, I think maybe it's better to do this through an
update in related action APIs, for example:
```
public interface SupportsRollbackReplace<T> {
T rollbackReplaceOnConflict();
}
public interface AppendFiles extends SnapshotUpdate<AppendFiles>,
SupportsRollbackReplace<AppendFiles> {
...
}
```
So that in core we can do:
```
table.newAppend().rollbackReplaceOnConflict().addFiles(...).commit()
```
And then we can make this a session config or runtime option in engine.
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]