This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new a1d325287 [#2292] fix(server): Potential hang when
HadoopShuffleWriteHandler initialization failure in
PooledHadoopShuffleWriteHandler (#2293)
a1d325287 is described below
commit a1d325287077f60597a7d64253252e9a7c46f979
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Dec 17 11:25:13 2024 +0800
[#2292] fix(server): Potential hang when HadoopShuffleWriteHandler
initialization failure in PooledHadoopShuffleWriteHandler (#2293)
### What changes were proposed in this pull request?
Correct the initialization flag to fix potential hang in
PooledHadoopShuffleWriteHandler
### Why are the changes needed?
fix: #2292
### Does this PR introduce _any_ user-facing change?
<!--
(Please list the user-facing changes introduced by your change, including
1. Change in user-facing APIs.
2. Addition or removal of property keys.)
-->
No.
### How was this patch tested?
Unit tests
---------
Co-authored-by: Junfan Zhang <[email protected]>
---
.../impl/PooledHadoopShuffleWriteHandler.java | 12 ++++---
.../impl/PooledHadoopShuffleWriteHandlerTest.java | 39 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 5 deletions(-)
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
index f46ebfdb2..f801516bf 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.storage.handler.impl;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
@@ -49,7 +50,7 @@ public class PooledHadoopShuffleWriteHandler implements
ShuffleWriteHandler {
private final int maxConcurrency;
private final String basePath;
private Function<Integer, ShuffleWriteHandler> createWriterFunc;
- private volatile int initializedHandlerCnt = 0;
+ private AtomicInteger initializedHandlerCntRef = new AtomicInteger(0);
// Only for tests
@VisibleForTesting
@@ -109,10 +110,11 @@ public class PooledHadoopShuffleWriteHandler implements
ShuffleWriteHandler {
@Override
public void write(Collection<ShufflePartitionedBlock> shuffleBlocks) throws
Exception {
- if (queue.isEmpty() && initializedHandlerCnt < maxConcurrency) {
+ if (queue.isEmpty() && initializedHandlerCntRef.get() < maxConcurrency) {
synchronized (this) {
- if (initializedHandlerCnt < maxConcurrency) {
- queue.add(createWriterFunc.apply(initializedHandlerCnt++));
+ if (initializedHandlerCntRef.get() < maxConcurrency) {
+ queue.add(createWriterFunc.apply(initializedHandlerCntRef.get()));
+ initializedHandlerCntRef.addAndGet(1);
}
}
}
@@ -132,6 +134,6 @@ public class PooledHadoopShuffleWriteHandler implements
ShuffleWriteHandler {
@VisibleForTesting
protected int getInitializedHandlerCnt() {
- return initializedHandlerCnt;
+ return initializedHandlerCntRef.get();
}
}
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
index 32e8786c8..460741db4 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
@@ -34,6 +34,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
public class PooledHadoopShuffleWriteHandlerTest {
@@ -41,6 +42,7 @@ public class PooledHadoopShuffleWriteHandlerTest {
private List<Integer> invokedList;
private int index;
private Runnable execution;
+ private boolean markInitializationFail = false;
FakedShuffleWriteHandler(List<Integer> invokedList, int index, Runnable
runnable) {
this.invokedList = invokedList;
@@ -48,6 +50,12 @@ public class PooledHadoopShuffleWriteHandlerTest {
this.execution = runnable;
}
+ FakedShuffleWriteHandler(boolean isMarkInitializationFail) {
+ if (isMarkInitializationFail) {
+ throw new RuntimeException("Fail to init");
+ }
+ }
+
FakedShuffleWriteHandler(
List<Integer> initializedList, List<Integer> invokedList, int index,
Runnable runnable) {
initializedList.add(index);
@@ -63,6 +71,37 @@ public class PooledHadoopShuffleWriteHandlerTest {
}
}
+ @Test
+ public void initializationFailureTest() throws Exception {
+ int maxConcurrency = 2;
+ LinkedBlockingDeque<ShuffleWriteHandler> deque = new
LinkedBlockingDeque<>(maxConcurrency);
+
+ PooledHadoopShuffleWriteHandler handler =
+ new PooledHadoopShuffleWriteHandler(
+ deque, maxConcurrency, index -> new
FakedShuffleWriteHandler(true));
+
+ // to check the initialization
+ for (int i = 0; i < maxConcurrency; i++) {
+ try {
+ handler.write(Collections.emptyList());
+ fail();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
+ // after initialization, the next writing will still fail due to the
previous initialization
+ // fail.
+ for (int i = 0; i < maxConcurrency; i++) {
+ try {
+ handler.write(Collections.emptyList());
+ fail();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+
@Test
public void lazyInitializeWriterHandlerTest() throws Exception {
int maxConcurrency = 5;