szehon-ho commented on a change in pull request #3069:
URL: https://github.com/apache/iceberg/pull/3069#discussion_r709735067
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -316,6 +316,58 @@ protected void
validateNoNewDeletesForDataFiles(TableMetadata base, Long startin
}
}
+ /**
+ * Validates that no delete files matching a filter have been added to the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to find new conflicting delete files
+ * @param caseSensitive whether expression evaluation should be
case-sensitive
+ */
+
+ protected void validateNoNewDeletes(TableMetadata base, Long
startingSnapshotId,
+ Expression dataFilter, boolean
caseSensitive) {
+ // if there is no current table state, no files have been added
+ if (base.currentSnapshot() == null) {
+ return;
+ }
+
+ Pair<List<ManifestFile>, Set<Long>> history =
+ validationHistory(base, startingSnapshotId,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
+ List<ManifestFile> deleteManifests = history.first();
+
+ long startingSequenceNumber = startingSequenceNumber(base,
startingSnapshotId);
+ DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests,
startingSequenceNumber, dataFilter, caseSensitive);
+
+ ValidationException.check(deletes.isEmpty(),
Review comment:
Thanks for adding this!
##########
File path: api/src/main/java/org/apache/iceberg/RowDelta.java
##########
@@ -111,4 +111,19 @@
* @return this for method chaining
*/
RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter);
+
+ /**
+ * Enables validation that delete files added concurrently do not conflict
with this commit's operation.
+ * <p>
+ * This method must be called when the table is queried to produce a row
delta for UPDATE and
+ * MERGE operations independently of the isolation level. Calling this
method isn't required
+ * for DELETE operations as it is OK when a particular record we are trying
to delete
+ * was deleted concurrently.
+ * <p>
+ * Validation applies to operations that happened after the snapshot passed
to {@link #validateFromSnapshot(long)}.
+ *
+ * @param conflictDetectionFilter an expression on rows in the table
+ * @return this for method chaining
+ */
+ RowDelta validateNoConflictingDeleteFiles(Expression
conflictDetectionFilter);
Review comment:
Nit: should we make it a bit consistent with above? (ie, omit 'files'
from the name)
##########
File path: core/src/main/java/org/apache/iceberg/BaseRowDelta.java
##########
@@ -94,9 +102,12 @@ protected void validate(TableMetadata base) {
validateDataFilesExist(base, startingSnapshotId, referencedDataFiles,
!validateDeletes);
}
- // TODO: does this need to check new delete files?
- if (conflictDetectionFilter != null) {
- validateAddedDataFiles(base, startingSnapshotId,
conflictDetectionFilter, caseSensitive);
+ if (appendConflictDetectionFilter != null) {
+ validateAddedDataFiles(base, startingSnapshotId,
appendConflictDetectionFilter, caseSensitive);
+ }
+
+ if (deleteConflictDetectionFilter != null) {
+ validateNoNewDeletes(base, startingSnapshotId,
deleteConflictDetectionFilter, caseSensitive);
Review comment:
Do you mean it cannot resolve within same data file (I thought we are
passing data filter)? Or within the same partition?
And also for my learning, you mean it will be over-aggressive and report
false negatives even if rows do not actually conflict, until we make the
optimization.
##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -86,6 +86,20 @@ public boolean isEmpty() {
return (globalDeletes == null || globalDeletes.length == 0) &&
sortedDeletesByPartition.isEmpty();
}
+ public List<DeleteFile> referencedDeleteFiles() {
+ List<DeleteFile> deleteFiles = Lists.newArrayList();
Review comment:
Optional comment: small optimization can be done by knowing the initial
length, and checking isEmpty
```
if (isEmpty()) {
return Lists.empty()
else
List<DeleteFile> deleteFiles = Lists.newArrayList(globalDeletes.length +
sortedDeletesByPartition.length);
...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]