This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new accdaea396 [Improve][API] Avoid relying on index alignment between 
keySet and values in getSinkTables() (#10665)
accdaea396 is described below

commit accdaea3964d8f25f2178df58c8cf9fd42427322
Author: zoo-code <[email protected]>
AuthorDate: Mon Mar 30 21:12:51 2026 +0900

    [Improve][API] Avoid relying on index alignment between keySet and values 
in getSinkTables() (#10665)
---
 .../api/sink/multitablesink/MultiTableSink.java    | 36 ++++++++++++++--------
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  9 +-----
 2 files changed, 24 insertions(+), 21 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index 713dc96506..ff13ceeb04 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -229,27 +229,37 @@ public class MultiTableSink
     }
 
     /**
-     * Returns the list of {@link TablePath}s for all tables managed by this 
sink.
+     * Returns the list of resolved sink {@link TablePath}s for all tables 
managed by this sink.
      *
-     * <p>For each sub-sink, tries {@link 
SeaTunnelSink#getWriteCatalogTable()} first to extract the
-     * table path from the catalog table. If that is not present, falls back 
to using the {@link
-     * TablePath} key from the original sinks map.
+     * <p>Delegates to {@link #getSinkTableMapping()} and returns its values 
as a list.
      *
-     * @return the list of table paths for all managed tables
+     * @return the list of resolved sink table paths
      */
     public List<TablePath> getSinkTables() {
+        return new ArrayList<>(getSinkTableMapping().values());
+    }
 
-        List<TablePath> tablePaths = new ArrayList<>();
-        List<SeaTunnelSink> values = new ArrayList<>(sinks.values());
-        for (int i = 0; i < values.size(); i++) {
-            if (values.get(i).getWriteCatalogTable().isPresent()) {
-                tablePaths.add(
-                        ((CatalogTable) 
values.get(i).getWriteCatalogTable().get()).getTablePath());
+    /**
+     * Returns a mapping from upstream {@link TablePath} keys to their 
resolved sink table paths.
+     *
+     * <p>For each sub-sink, if {@link SeaTunnelSink#getWriteCatalogTable()} 
is present, the
+     * resolved path comes from the catalog table. Otherwise, the upstream key 
is used as-is.
+     *
+     * @return a map of upstream table paths to resolved sink table paths
+     */
+    public Map<TablePath, TablePath> getSinkTableMapping() {
+        Map<TablePath, TablePath> mapping = new HashMap<>();
+        for (Map.Entry<TablePath, SeaTunnelSink> entry : sinks.entrySet()) {
+            if (entry.getValue().getWriteCatalogTable().isPresent()) {
+                mapping.put(
+                        entry.getKey(),
+                        ((CatalogTable) 
entry.getValue().getWriteCatalogTable().get())
+                                .getTablePath());
             } else {
-                tablePaths.add(sinks.keySet().toArray(new TablePath[0])[i]);
+                mapping.put(entry.getKey(), entry.getKey());
             }
         }
-        return tablePaths;
+        return mapping;
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 7e8427af9b..9eab57e884 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -123,14 +123,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
         boolean isMulti = sinkAction.getSink() instanceof MultiTableSink;
         if (isMulti) {
             sinkTables = ((MultiTableSink) 
sinkAction.getSink()).getSinkTables();
-            TablePath[] upstreamTablePaths =
-                    ((MultiTableSink) sinkAction.getSink())
-                            .getSinks()
-                            .keySet()
-                            .toArray(new TablePath[0]);
-            for (int i = 0; i < ((MultiTableSink) 
sinkAction.getSink()).getSinks().size(); i++) {
-                tablesMaps.put(upstreamTablePaths[i], sinkTables.get(i));
-            }
+            tablesMaps.putAll(((MultiTableSink) 
sinkAction.getSink()).getSinkTableMapping());
         } else {
             Optional<CatalogTable> catalogTable = 
sinkAction.getSink().getWriteCatalogTable();
             if (catalogTable.isPresent()) {

Reply via email to