This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d4056530d2 Core: Fix lock acquisition logic in HadoopTableOperations
rename (#9498)
d4056530d2 is described below
commit d4056530d27864adb6cf141d85c81adde46c7b28
Author: N-o-Z <[email protected]>
AuthorDate: Wed Jan 17 23:02:47 2024 +0200
Core: Fix lock acquisition logic in HadoopTableOperations rename (#9498)
---
.../iceberg/hadoop/HadoopTableOperations.java | 10 ++++--
.../apache/iceberg/hadoop/TestHadoopCommits.java | 40 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index 44936f2514..9ef2c63e26 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -360,7 +360,11 @@ public class HadoopTableOperations implements
TableOperations {
*/
private void renameToFinal(FileSystem fs, Path src, Path dst, int
nextVersion) {
try {
- lockManager.acquire(dst.toString(), src.toString());
+ if (!lockManager.acquire(dst.toString(), src.toString())) {
+ throw new CommitFailedException(
+ "Failed to acquire lock on file: %s with owner: %s", dst, src);
+ }
+
if (fs.exists(dst)) {
throw new CommitFailedException("Version %d already exists: %s",
nextVersion, dst);
}
@@ -383,7 +387,9 @@ public class HadoopTableOperations implements
TableOperations {
}
throw cfe;
} finally {
- lockManager.release(dst.toString(), src.toString());
+ if (!lockManager.release(dst.toString(), src.toString())) {
+ LOG.warn("Failed to release lock on file: {} with owner: {}", dst,
src);
+ }
}
}
diff --git
a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
index e02b9deaee..b3ddc09c0f 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
@@ -30,18 +30,22 @@ import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.LockManager;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -451,4 +455,40 @@ public class TestHadoopCommits extends HadoopTableTestBase
{
Assertions.assertThat(Lists.newArrayList(tableWithHighRetries.snapshots()))
.hasSize(threadsCount * numberOfCommitedFilesPerThread);
}
+
+ @Test
+ public void testCommitFailedToAcquireLock() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ Configuration conf = new Configuration();
+ LockManager lockManager = new NoLockManager();
+ HadoopTableOperations tableOperations =
+ new HadoopTableOperations(
+ new Path(table.location()), new HadoopFileIO(conf), conf,
lockManager);
+ tableOperations.refresh();
+ BaseTable baseTable = (BaseTable) table;
+ TableMetadata meta2 = baseTable.operations().current();
+ Assertions.assertThatThrownBy(() ->
tableOperations.commit(tableOperations.current(), meta2))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageStartingWith("Failed to acquire lock on file");
+ }
+
+ // Always returns false when trying to acquire
+ static class NoLockManager implements LockManager {
+
+ @Override
+ public boolean acquire(String entityId, String ownerId) {
+ return false;
+ }
+
+ @Override
+ public boolean release(String entityId, String ownerId) {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public void initialize(Map<String, String> properties) {}
+ }
}