This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 51c18cd48da HIVE-28373: FlakyTest: TestHiveHadoopCommits (#6345)
51c18cd48da is described below
commit 51c18cd48dabfcd7ba104ddfc2bfcbf8a51854ec
Author: PLASH SPEED <[email protected]>
AuthorDate: Fri Mar 13 20:51:22 2026 +0800
HIVE-28373: FlakyTest: TestHiveHadoopCommits (#6345)
---
.../iceberg/hadoop/TestHiveHadoopCommits.java | 113 ++++++++-------------
1 file changed, 40 insertions(+), 73 deletions(-)
diff --git
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java
index c304b7c205e..c69f6bd9082 100644
---
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java
+++
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java
@@ -21,6 +21,9 @@
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -34,6 +37,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.util.Tasks;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -55,7 +59,7 @@ void testCommitFailedBeforeChangeVersionHint() {
BaseTable baseTable = (BaseTable) table;
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps2 = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps2 = spy(tableOperations);
doReturn(10000).when(spyOps2).findVersionWithOutVersionHint(any());
TableMetadata metadataV1 = spyOps2.current();
SortOrder dataSort =
SortOrder.builderFor(baseTable.schema()).asc("data").build();
@@ -64,7 +68,7 @@ void testCommitFailedBeforeChangeVersionHint() {
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("Are there other clients running in parallel
with the current task?");
- HadoopTableOperations spyOps3 = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps3 = spy(tableOperations);
doReturn(false).when(spyOps3).nextVersionIsLatest(anyInt(), anyInt());
assertCommitNotChangeVersion(
baseTable,
@@ -72,7 +76,7 @@ void testCommitFailedBeforeChangeVersionHint() {
CommitFailedException.class,
"Are there other clients running in parallel with the current
task?");
- HadoopTableOperations spyOps4 = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps4 = spy(tableOperations);
doThrow(new RuntimeException("FileSystem crash!"))
.when(spyOps4)
.renameMetaDataFileAndCheck(any(), any(), any(), anyBoolean());
@@ -85,7 +89,7 @@ void testCommitFailedAndCheckFailed() throws IOException {
table.newFastAppend().appendFile(FILE_A).commit();
BaseTable baseTable = (BaseTable) table;
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps = spy(tableOperations);
doThrow(new IOException("FileSystem crash!"))
.when(spyOps)
.renameMetaDataFile(any(), any(), any());
@@ -95,13 +99,13 @@ void testCommitFailedAndCheckFailed() throws IOException {
assertCommitNotChangeVersion(
baseTable, spyOps, CommitStateUnknownException.class, "FileSystem
crash!");
- HadoopTableOperations spyOps2 = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps2 = spy(tableOperations);
doThrow(new OutOfMemoryError("Java heap space"))
.when(spyOps2)
.renameMetaDataFile(any(), any(), any());
assertCommitFail(baseTable, spyOps2, OutOfMemoryError.class, "Java heap
space");
- HadoopTableOperations spyOps3 = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps3 = spy(tableOperations);
doThrow(new RuntimeException("UNKNOWN ERROR"))
.when(spyOps3)
.renameMetaDataFile(any(), any(), any());
@@ -114,7 +118,7 @@ void testCommitFailedAndRenameNotSuccess() throws
IOException {
table.newFastAppend().appendFile(FILE_A).commit();
BaseTable baseTable = (BaseTable) table;
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps = spy(tableOperations);
doThrow(new IOException("FileSystem crash!"))
.when(spyOps)
.renameMetaDataFile(any(), any(), any());
@@ -130,7 +134,7 @@ void testCommitFailedButActualSuccess() throws IOException {
table.newFastAppend().appendFile(FILE_A).commit();
BaseTable baseTable = (BaseTable) table;
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps = spy(tableOperations);
doThrow(new IOException("FileSystem crash!"))
.when(spyOps)
.renameMetaDataFile(any(), any(), any());
@@ -174,7 +178,7 @@ void testCommitFailedAfterChangeVersionHintRepeatCommit() {
table.newFastAppend().appendFile(FILE_A).commit();
BaseTable baseTable = (BaseTable) table;
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps = spy(tableOperations);
doThrow(new RuntimeException("FileSystem crash!"))
.when(spyOps)
.deleteRemovedMetadataFiles(any(), any());
@@ -204,22 +208,15 @@ void testCommitFailedAfterChangeVersionHintRepeatCommit()
{
@Test
void testTwoClientCommitSameVersion() throws InterruptedException {
- // In the linux environment, the JDK FileSystem interface implementation
class is
- // java.io.UnixFileSystem.
- // Its behavior follows the posix protocol, which causes rename operations
to overwrite the
- // target file (because linux is compatible with some of the unix
interfaces).
- // However, linux also supports renaming without overwriting the target
file. In addition, some
- // other file systems such as Windows, HDFS, etc. also support renaming
without overwriting the
- // target file.
- // We use the `mv -n` command to simulate the behavior of such filesystems.
table.newFastAppend().appendFile(FILE_A).commit();
- ExecutorService executorService = Executors.newFixedThreadPool(8);
+ ExecutorService executorService =
Executors.newVirtualThreadPerTaskExecutor();
AtomicReference<Throwable> unexpectedException = new
AtomicReference<>(null);
AtomicReference<Throwable> expectedException = new AtomicReference<>(null);
CountDownLatch countDownLatch = new CountDownLatch(2);
BaseTable baseTable = (BaseTable) table;
assertThat(((HadoopTableOperations)
baseTable.operations()).findVersion()).isEqualTo(2);
- executorService.execute(() -> {
+ final Object lock = new Object();
+ Runnable commitTask = () -> {
try {
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
HadoopTableOperations spyOps = spy(tableOperations);
@@ -232,47 +229,16 @@ void testTwoClientCommitSameVersion() throws
InterruptedException {
doAnswer(x -> {
Path srcPath = x.getArgument(1);
Path dstPath = x.getArgument(2);
- File src = new File(srcPath.toUri());
- File dst = new File(dstPath.toUri());
- String srcPathStr = src.getAbsolutePath();
- String dstPathStr = dst.getAbsolutePath();
- String cmd = String.format("mv -n %s %s", srcPathStr, dstPathStr);
- Process process = Runtime.getRuntime().exec(cmd);
- assertThat(process.waitFor()).isZero();
- return dst.exists() && !src.exists();
- }).when(spyOps).renameMetaDataFile(any(), any(), any());
- TableMetadata metadataV1 = spyOps.current();
- SortOrder dataSort =
SortOrder.builderFor(baseTable.schema()).asc("data").build();
- TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort);
- spyOps.commit(metadataV1, metadataV2);
- } catch (CommitFailedException e) {
- expectedException.set(e);
- } catch (Throwable e) {
- unexpectedException.set(e);
- }
- });
-
- executorService.execute(() -> {
- try {
- HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps = (HadoopTableOperations)
spy(tableOperations);
- doNothing().when(spyOps).tryLock(any(), any());
- doAnswer(x -> {
- countDownLatch.countDown();
- countDownLatch.await();
- return x.callRealMethod();
- }).when(spyOps).renameMetaDataFileAndCheck(any(), any(), any(),
anyBoolean());
- doAnswer(x -> {
- Path srcPath = x.getArgument(1);
- Path dstPath = x.getArgument(2);
- File src = new File(srcPath.toUri());
- File dst = new File(dstPath.toUri());
- String srcPathStr = src.getAbsolutePath();
- String dstPathStr = dst.getAbsolutePath();
- String cmd = String.format("mv -n %s %s", srcPathStr, dstPathStr);
- Process process = Runtime.getRuntime().exec(cmd);
- assertThat(process.waitFor()).isZero();
- return dst.exists() && !src.exists();
+ var src = Paths.get(srcPath.toUri());
+ var dst = Paths.get(dstPath.toUri());
+ synchronized (lock) {
+ if (Files.exists(dst)) {
+ return false;
+ } else {
+ Files.move(src, dst, StandardCopyOption.ATOMIC_MOVE);
+ return true;
+ }
+ }
}).when(spyOps).renameMetaDataFile(any(), any(), any());
TableMetadata metadataV1 = spyOps.current();
SortOrder dataSort =
SortOrder.builderFor(baseTable.schema()).asc("data").build();
@@ -283,7 +249,8 @@ void testTwoClientCommitSameVersion() throws
InterruptedException {
} catch (Throwable e) {
unexpectedException.set(e);
}
- });
+ };
+ Tasks.range(2).executeWith(executorService).run(i -> commitTask.run());
executorService.shutdown();
if (!executorService.awaitTermination(610, TimeUnit.SECONDS)) {
executorService.shutdownNow();
@@ -302,7 +269,7 @@ void testTwoClientCommitSameVersion() throws
InterruptedException {
void testConcurrentCommitAndRejectCommitAlreadyExistsVersion()
throws InterruptedException {
table.newFastAppend().appendFile(FILE_A).commit();
- ExecutorService executorService = Executors.newFixedThreadPool(8);
+ ExecutorService executorService =
Executors.newVirtualThreadPerTaskExecutor();
AtomicReference<Throwable> unexpectedException = new
AtomicReference<>(null);
AtomicInteger commitTimes = new AtomicInteger(0);
int maxCommitTimes = 20;
@@ -312,7 +279,7 @@ void
testConcurrentCommitAndRejectCommitAlreadyExistsVersion()
try {
BaseTable baseTable = (BaseTable) table;
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps = spy(tableOperations);
doNothing().when(spyOps).tryLock(any(), any());
doAnswer(x -> {
countDownLatch2.countDown();
@@ -353,7 +320,7 @@ void
testRejectCommitAlreadyExistsVersionWithUsingObjectStore()
// memory locks. So we can use the local file system to simulate the use
of object storage.
table.updateProperties().set(TableProperties.OBJECT_STORE_ENABLED,
"true").commit();
table.newFastAppend().appendFile(FILE_A).commit();
- ExecutorService executorService = Executors.newFixedThreadPool(8);
+ ExecutorService executorService =
Executors.newVirtualThreadPerTaskExecutor();
AtomicReference<Throwable> unexpectedException = new
AtomicReference<>(null);
AtomicInteger commitTimes = new AtomicInteger(0);
int maxCommitTimes = 20;
@@ -363,7 +330,7 @@ void
testRejectCommitAlreadyExistsVersionWithUsingObjectStore()
try {
BaseTable baseTable = (BaseTable) table;
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps = spy(tableOperations);
doAnswer(x -> {
countDownLatch2.countDown();
countDownLatch.await();
@@ -403,7 +370,7 @@ void testConcurrentCommitAndRejectTooOldCommit() throws
InterruptedException {
table.updateProperties().set(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
"2").commit();
table.updateProperties().set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
"true")
.commit();
- ExecutorService executorService = Executors.newFixedThreadPool(8);
+ ExecutorService executorService =
Executors.newVirtualThreadPerTaskExecutor();
AtomicReference<Throwable> unexpectedException = new
AtomicReference<>(null);
AtomicInteger commitTimes = new AtomicInteger(0);
int maxCommitTimes = 20;
@@ -414,7 +381,7 @@ void testConcurrentCommitAndRejectTooOldCommit() throws
InterruptedException {
try {
BaseTable baseTable = (BaseTable) table;
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps = spy(tableOperations);
doNothing().when(spyOps).tryLock(any(), any());
doAnswer(x -> {
countDownLatch2.countDown();
@@ -466,7 +433,7 @@ void testRejectTooOldCommitWithUsingObjectStore() throws
InterruptedException {
table.updateProperties()
.set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true")
.commit();
- ExecutorService executorService = Executors.newFixedThreadPool(8);
+ ExecutorService executorService =
Executors.newVirtualThreadPerTaskExecutor();
AtomicReference<Throwable> unexpectedException = new
AtomicReference<>(null);
AtomicInteger commitTimes = new AtomicInteger(0);
int maxCommitTimes = 20;
@@ -476,7 +443,7 @@ void testRejectTooOldCommitWithUsingObjectStore() throws
InterruptedException {
try {
BaseTable baseTable = (BaseTable) table;
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps = spy(tableOperations);
doNothing().when(spyOps).tryLock(any(), any());
doAnswer(x -> {
countDownLatch2.countDown();
@@ -521,7 +488,7 @@ void testConcurrentCommitAndRejectDirtyCommit() throws
InterruptedException {
table.updateProperties()
.set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true")
.commit();
- ExecutorService executorService = Executors.newFixedThreadPool(8);
+ ExecutorService executorService =
Executors.newVirtualThreadPerTaskExecutor();
AtomicReference<Throwable> unexpectedException = new
AtomicReference<>(null);
AtomicInteger commitTimes = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(5);
@@ -531,7 +498,7 @@ void testConcurrentCommitAndRejectDirtyCommit() throws
InterruptedException {
try {
BaseTable baseTable = (BaseTable) table;
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps = spy(tableOperations);
TableMetadata metadataV1 = spyOps.current();
SortOrder dataSort =
SortOrder.builderFor(baseTable.schema()).asc("data").build();
TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort);
@@ -579,13 +546,13 @@ void testCleanTooOldDirtyCommit() throws
InterruptedException {
table.updateProperties()
.set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true")
.commit();
- ExecutorService executorService = Executors.newFixedThreadPool(8);
+ ExecutorService executorService =
Executors.newVirtualThreadPerTaskExecutor();
AtomicReference<Exception> unexpectedException = new
AtomicReference<>(null);
AtomicInteger commitTimes = new AtomicInteger(0);
int maxCommitTimes = 20;
BaseTable baseTable = (BaseTable) table;
HadoopTableOperations tableOperations = (HadoopTableOperations)
baseTable.operations();
- HadoopTableOperations spyOps = (HadoopTableOperations)
spy(tableOperations);
+ HadoopTableOperations spyOps = spy(tableOperations);
CountDownLatch countDownLatch = new CountDownLatch(5);
CountDownLatch countDownLatch2 = new CountDownLatch(1);
AtomicReference<File> dirtyCommitFile = new AtomicReference<>(null);