This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch 0.7.x
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/0.7.x by this push:
     new e73ca0a4f [AMORO-3239] Fix stack overflow caused by reading too many 
partitions in the filter (#3240)
e73ca0a4f is described below

commit e73ca0a4fc9655406fc79d6df55c2b16c3b43bd4
Author: 7hong <[email protected]>
AuthorDate: Wed Oct 16 20:31:40 2024 +0800

    [AMORO-3239] Fix stack overflow caused by reading too many partitions in 
the filter (#3240)
    
    * [AMORO-3239] Fix stack overflow caused by reading too many partitions in 
the filter
    
    * [AMORO-3239] Add the "ignore-filter-partition-count" parameter
    
    * move parameter "optimizer.ignore-filter-partition-count" to 
"self-optimizing.skip-filter-partition-count"
    
    * move parameter "self-optimizing.skip-filter-partition-count" to 
"refresh-tables.max-pending-partition-count"
---
 .../java/org/apache/amoro/server/AmoroManagementConf.java     |  6 ++++++
 .../amoro/server/optimizing/plan/OptimizingEvaluator.java     | 11 +++++++++--
 .../amoro/server/optimizing/plan/OptimizingPlanner.java       |  2 +-
 .../amoro/server/table/executor/AsyncTableExecutors.java      |  3 ++-
 .../server/table/executor/TableRuntimeRefreshExecutor.java    |  8 ++++++--
 .../org/apache/amoro/server/TestDefaultOptimizingService.java |  2 +-
 .../amoro/server/optimizing/plan/TestOptimizingEvaluator.java |  2 +-
 amoro-ams/dist/src/main/amoro-bin/conf/config.yaml            |  1 +
 8 files changed, 27 insertions(+), 8 deletions(-)

diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 6c798ce03..e561a9e13 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -156,6 +156,12 @@ public class AmoroManagementConf {
           .defaultValue(60000L)
           .withDescription("Interval for refreshing table metadata.");
 
+  public static final ConfigOption<Integer> REFRESH_MAX_PENDING_PARTITIONS =
+      ConfigOptions.key("refresh-tables.max-pending-partition-count")
+          .intType()
+          .defaultValue(100)
+          .withDescription("Filters will not be used beyond that number of 
partitions");
+
   public static final ConfigOption<Long> BLOCKER_TIMEOUT =
       ConfigOptions.key("blocker.timeout")
           .longType()
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
index cadf5ae2a..90a890753 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
@@ -50,6 +50,7 @@ import java.io.UncheckedIOException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class OptimizingEvaluator {
 
@@ -58,14 +59,17 @@ public class OptimizingEvaluator {
   protected final MixedTable mixedTable;
   protected final TableRuntime tableRuntime;
   protected final TableSnapshot currentSnapshot;
+  protected final int maxPendingPartitions;
   protected boolean isInitialized = false;
 
   protected Map<String, PartitionEvaluator> partitionPlanMap = 
Maps.newHashMap();
 
-  public OptimizingEvaluator(TableRuntime tableRuntime, MixedTable table) {
+  public OptimizingEvaluator(
+      TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) {
     this.tableRuntime = tableRuntime;
     this.mixedTable = table;
     this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
+    this.maxPendingPartitions = maxPendingPartitions;
   }
 
   public TableRuntime getTableRuntime() {
@@ -129,7 +133,10 @@ public class OptimizingEvaluator {
         mixedTable.id(),
         count,
         System.currentTimeMillis() - startTime);
-    partitionPlanMap.values().removeIf(plan -> !plan.isNecessary());
+    partitionPlanMap = partitionPlanMap.entrySet().stream()
+        .filter(entry -> entry.getValue().isNecessary())
+        .limit(maxPendingPartitions)
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   }
 
   private Map<String, String> partitionProperties(Pair<Integer, StructLike> 
partition) {
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
index 66f311f2f..1f77e7abf 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
@@ -64,7 +64,7 @@ public class OptimizingPlanner extends OptimizingEvaluator {
       MixedTable table,
       double availableCore,
       long maxInputSizePerThread) {
-    super(tableRuntime, table);
+    super(tableRuntime, table, Integer.MAX_VALUE);
     this.partitionFilter =
         tableRuntime.getPendingInput() == null
             ? Expressions.alwaysTrue()
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
index 0af3998fe..affd72c31 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
@@ -75,7 +75,8 @@ public class AsyncTableExecutors {
         new TableRuntimeRefreshExecutor(
             tableManager,
             conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
-            conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL));
+            conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL),
+            
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
     if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
       this.tagsAutoCreatingExecutor =
           new TagsAutoCreatingExecutor(
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
index 7837537ed..dc4ef14b3 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
@@ -31,10 +31,13 @@ public class TableRuntimeRefreshExecutor extends 
BaseTableExecutor {
 
   // 1 minutes
   private final long interval;
+  private final int maxPendingPartitions;
 
-  public TableRuntimeRefreshExecutor(TableManager tableRuntimes, int poolSize, 
long interval) {
+  public TableRuntimeRefreshExecutor(
+      TableManager tableRuntimes, int poolSize, long interval, int 
maxPendingPartitions) {
     super(tableRuntimes, poolSize);
     this.interval = interval;
+    this.maxPendingPartitions = maxPendingPartitions;
   }
 
   @Override
@@ -48,7 +51,8 @@ public class TableRuntimeRefreshExecutor extends 
BaseTableExecutor {
 
   private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable 
table) {
     if (tableRuntime.isOptimizingEnabled() && 
!tableRuntime.getOptimizingStatus().isProcessing()) {
-      OptimizingEvaluator evaluator = new OptimizingEvaluator(tableRuntime, 
table);
+      OptimizingEvaluator evaluator =
+          new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
       if (evaluator.isNecessary()) {
         OptimizingEvaluator.PendingInput pendingInput = 
evaluator.getPendingInput();
         logger.debug(
diff --git 
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
 
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 01631f073..3451cc441 100644
--- 
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++ 
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -411,7 +411,7 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
   private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor {
 
     public TableRuntimeRefresher() {
-      super(tableService(), 1, Integer.MAX_VALUE);
+      super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
     }
 
     void refreshPending() {
diff --git 
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
 
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
index 07c77179b..87dddd5f4 100644
--- 
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
+++ 
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
@@ -110,7 +110,7 @@ public class TestOptimizingEvaluator extends 
MixedTablePlanTestBase {
   }
 
   protected OptimizingEvaluator buildOptimizingEvaluator() {
-    return new OptimizingEvaluator(getTableRuntime(), getMixedTable());
+    return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100);
   }
 
   protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) {
diff --git a/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml 
b/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml
index cdea70084..d5d50c6f3 100644
--- a/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml
+++ b/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml
@@ -42,6 +42,7 @@ ams:
   refresh-tables:
     thread-count: 10
     interval: 60000 # 1min
+    max-pending-partition-count: 100 # default 100
 
   self-optimizing:
     commit-thread-count: 10

Reply via email to