This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e568ba3b4c9 [bugfix](hive)Handle exceptions when submitting tasks
fails to prevent dead loop for 2.1 (#40708) (#41052)
e568ba3b4c9 is described below
commit e568ba3b4c9b44bba00208e8e09d32ef5b9b9710
Author: wuwenchi <[email protected]>
AuthorDate: Sat Sep 21 08:47:31 2024 +0800
[bugfix](hive)Handle exceptions when submitting tasks fails to prevent dead
loop for 2.1 (#40708) (#41052)
bp: #40708
---
.../doris/datasource/hive/source/HiveScanNode.java | 44 +++++++++++-----------
1 file changed, 23 insertions(+), 21 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index db4161a4e23..634c596c69f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -257,30 +257,32 @@ public class HiveScanNode extends FileQueryScanNode {
}
try {
splittersOnFlight.acquire();
- } catch (InterruptedException e) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ List<Split> allFiles = Lists.newArrayList();
+ getFileSplitByPartitions(
+ cache,
Collections.singletonList(partition), allFiles, bindBrokerName);
+ if (allFiles.size() > numSplitsPerPartition.get())
{
+ numSplitsPerPartition.set(allFiles.size());
+ }
+ splitAssignment.addToQueue(allFiles);
+ } catch (IOException e) {
+ batchException.set(new
UserException(e.getMessage(), e));
+ } finally {
+ splittersOnFlight.release();
+ if (batchException.get() != null) {
+
splitAssignment.setException(batchException.get());
+ }
+ if (numFinishedPartitions.incrementAndGet() ==
prunedPartitions.size()) {
+ splitAssignment.finishSchedule();
+ }
+ }
+ }, scheduleExecutor);
+ } catch (Exception e) {
+ // When submitting a task, an exception will be thrown if
the task pool(scheduleExecutor) is full
batchException.set(new UserException(e.getMessage(), e));
break;
}
- CompletableFuture.runAsync(() -> {
- try {
- List<Split> allFiles = Lists.newArrayList();
- getFileSplitByPartitions(cache,
Collections.singletonList(partition), allFiles, bindBrokerName);
- if (allFiles.size() > numSplitsPerPartition.get()) {
- numSplitsPerPartition.set(allFiles.size());
- }
- splitAssignment.addToQueue(allFiles);
- } catch (IOException e) {
- batchException.set(new UserException(e.getMessage(),
e));
- } finally {
- splittersOnFlight.release();
- if (batchException.get() != null) {
- splitAssignment.setException(batchException.get());
- }
- if (numFinishedPartitions.incrementAndGet() ==
prunedPartitions.size()) {
- splitAssignment.finishSchedule();
- }
- }
- }, scheduleExecutor);
}
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]