This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch 0.7.x
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/0.7.x by this push:
new e73ca0a4f [AMORO-3239] Fix stack overflow caused by reading too many
partitions in the filter (#3240)
e73ca0a4f is described below
commit e73ca0a4fc9655406fc79d6df55c2b16c3b43bd4
Author: 7hong <[email protected]>
AuthorDate: Wed Oct 16 20:31:40 2024 +0800
[AMORO-3239] Fix stack overflow caused by reading too many partitions in
the filter (#3240)
* [AMORO-3239] Fix stack overflow caused by reading too many partitions in
the filter
* [AMORO-3239] Add the "ignore-filter-partition-count" parameter
* move parameter "optimizer.ignore-filter-partition-count" to
"self-optimizing.skip-filter-partition-count"
* move parameter "self-optimizing.skip-filter-partition-count" to
"refresh-tables.max-pending-partition-count"
---
.../java/org/apache/amoro/server/AmoroManagementConf.java | 6 ++++++
.../amoro/server/optimizing/plan/OptimizingEvaluator.java | 11 +++++++++--
.../amoro/server/optimizing/plan/OptimizingPlanner.java | 2 +-
.../amoro/server/table/executor/AsyncTableExecutors.java | 3 ++-
.../server/table/executor/TableRuntimeRefreshExecutor.java | 8 ++++++--
.../org/apache/amoro/server/TestDefaultOptimizingService.java | 2 +-
.../amoro/server/optimizing/plan/TestOptimizingEvaluator.java | 2 +-
amoro-ams/dist/src/main/amoro-bin/conf/config.yaml | 1 +
8 files changed, 27 insertions(+), 8 deletions(-)
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 6c798ce03..e561a9e13 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -156,6 +156,12 @@ public class AmoroManagementConf {
.defaultValue(60000L)
.withDescription("Interval for refreshing table metadata.");
+ public static final ConfigOption<Integer> REFRESH_MAX_PENDING_PARTITIONS =
+ ConfigOptions.key("refresh-tables.max-pending-partition-count")
+ .intType()
+ .defaultValue(100)
+ .withDescription("Filters will not be used beyond that number of
partitions");
+
public static final ConfigOption<Long> BLOCKER_TIMEOUT =
ConfigOptions.key("blocker.timeout")
.longType()
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
index cadf5ae2a..90a890753 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
@@ -50,6 +50,7 @@ import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
public class OptimizingEvaluator {
@@ -58,14 +59,17 @@ public class OptimizingEvaluator {
protected final MixedTable mixedTable;
protected final TableRuntime tableRuntime;
protected final TableSnapshot currentSnapshot;
+ protected final int maxPendingPartitions;
protected boolean isInitialized = false;
protected Map<String, PartitionEvaluator> partitionPlanMap =
Maps.newHashMap();
- public OptimizingEvaluator(TableRuntime tableRuntime, MixedTable table) {
+ public OptimizingEvaluator(
+ TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) {
this.tableRuntime = tableRuntime;
this.mixedTable = table;
this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
+ this.maxPendingPartitions = maxPendingPartitions;
}
public TableRuntime getTableRuntime() {
@@ -129,7 +133,10 @@ public class OptimizingEvaluator {
mixedTable.id(),
count,
System.currentTimeMillis() - startTime);
- partitionPlanMap.values().removeIf(plan -> !plan.isNecessary());
+ partitionPlanMap = partitionPlanMap.entrySet().stream()
+ .filter(entry -> entry.getValue().isNecessary())
+ .limit(maxPendingPartitions)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private Map<String, String> partitionProperties(Pair<Integer, StructLike>
partition) {
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
index 66f311f2f..1f77e7abf 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
@@ -64,7 +64,7 @@ public class OptimizingPlanner extends OptimizingEvaluator {
MixedTable table,
double availableCore,
long maxInputSizePerThread) {
- super(tableRuntime, table);
+ super(tableRuntime, table, Integer.MAX_VALUE);
this.partitionFilter =
tableRuntime.getPendingInput() == null
? Expressions.alwaysTrue()
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
index 0af3998fe..affd72c31 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
@@ -75,7 +75,8 @@ public class AsyncTableExecutors {
new TableRuntimeRefreshExecutor(
tableManager,
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
- conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL));
+ conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL),
+
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
this.tagsAutoCreatingExecutor =
new TagsAutoCreatingExecutor(
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
index 7837537ed..dc4ef14b3 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
@@ -31,10 +31,13 @@ public class TableRuntimeRefreshExecutor extends
BaseTableExecutor {
// 1 minutes
private final long interval;
+ private final int maxPendingPartitions;
- public TableRuntimeRefreshExecutor(TableManager tableRuntimes, int poolSize,
long interval) {
+ public TableRuntimeRefreshExecutor(
+ TableManager tableRuntimes, int poolSize, long interval, int
maxPendingPartitions) {
super(tableRuntimes, poolSize);
this.interval = interval;
+ this.maxPendingPartitions = maxPendingPartitions;
}
@Override
@@ -48,7 +51,8 @@ public class TableRuntimeRefreshExecutor extends
BaseTableExecutor {
private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable
table) {
if (tableRuntime.isOptimizingEnabled() &&
!tableRuntime.getOptimizingStatus().isProcessing()) {
- OptimizingEvaluator evaluator = new OptimizingEvaluator(tableRuntime,
table);
+ OptimizingEvaluator evaluator =
+ new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
if (evaluator.isNecessary()) {
OptimizingEvaluator.PendingInput pendingInput =
evaluator.getPendingInput();
logger.debug(
diff --git
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 01631f073..3451cc441 100644
---
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -411,7 +411,7 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor {
public TableRuntimeRefresher() {
- super(tableService(), 1, Integer.MAX_VALUE);
+ super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
void refreshPending() {
diff --git
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
index 07c77179b..87dddd5f4 100644
---
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
+++
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
@@ -110,7 +110,7 @@ public class TestOptimizingEvaluator extends
MixedTablePlanTestBase {
}
protected OptimizingEvaluator buildOptimizingEvaluator() {
- return new OptimizingEvaluator(getTableRuntime(), getMixedTable());
+ return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100);
}
protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) {
diff --git a/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml
b/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml
index cdea70084..d5d50c6f3 100644
--- a/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml
+++ b/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml
@@ -42,6 +42,7 @@ ams:
refresh-tables:
thread-count: 10
interval: 60000 # 1min
+ max-pending-partition-count: 100 # default 100
self-optimizing:
commit-thread-count: 10