This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new accdaea396 [Improve][API] Avoid relying on index alignment between
keySet and values in getSinkTables() (#10665)
accdaea396 is described below
commit accdaea3964d8f25f2178df58c8cf9fd42427322
Author: zoo-code <[email protected]>
AuthorDate: Mon Mar 30 21:12:51 2026 +0900
[Improve][API] Avoid relying on index alignment between keySet and values
in getSinkTables() (#10665)
---
.../api/sink/multitablesink/MultiTableSink.java | 36 ++++++++++++++--------
.../engine/server/task/flow/SinkFlowLifeCycle.java | 9 +-----
2 files changed, 24 insertions(+), 21 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index 713dc96506..ff13ceeb04 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -229,27 +229,37 @@ public class MultiTableSink
}
/**
- * Returns the list of {@link TablePath}s for all tables managed by this
sink.
+ * Returns the list of resolved sink {@link TablePath}s for all tables
managed by this sink.
*
- * <p>For each sub-sink, tries {@link
SeaTunnelSink#getWriteCatalogTable()} first to extract the
- * table path from the catalog table. If that is not present, falls back
to using the {@link
- * TablePath} key from the original sinks map.
+ * <p>Delegates to {@link #getSinkTableMapping()} and returns its values
as a list.
*
- * @return the list of table paths for all managed tables
+ * @return the list of resolved sink table paths
*/
public List<TablePath> getSinkTables() {
+ return new ArrayList<>(getSinkTableMapping().values());
+ }
- List<TablePath> tablePaths = new ArrayList<>();
- List<SeaTunnelSink> values = new ArrayList<>(sinks.values());
- for (int i = 0; i < values.size(); i++) {
- if (values.get(i).getWriteCatalogTable().isPresent()) {
- tablePaths.add(
- ((CatalogTable)
values.get(i).getWriteCatalogTable().get()).getTablePath());
+ /**
+ * Returns a mapping from upstream {@link TablePath} keys to their
resolved sink table paths.
+ *
+ * <p>For each sub-sink, if {@link SeaTunnelSink#getWriteCatalogTable()}
is present, the
+ * resolved path comes from the catalog table. Otherwise, the upstream key
is used as-is.
+ *
+ * @return a map of upstream table paths to resolved sink table paths
+ */
+ public Map<TablePath, TablePath> getSinkTableMapping() {
+ Map<TablePath, TablePath> mapping = new HashMap<>();
+ for (Map.Entry<TablePath, SeaTunnelSink> entry : sinks.entrySet()) {
+ if (entry.getValue().getWriteCatalogTable().isPresent()) {
+ mapping.put(
+ entry.getKey(),
+ ((CatalogTable)
entry.getValue().getWriteCatalogTable().get())
+ .getTablePath());
} else {
- tablePaths.add(sinks.keySet().toArray(new TablePath[0])[i]);
+ mapping.put(entry.getKey(), entry.getKey());
}
}
- return tablePaths;
+ return mapping;
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 7e8427af9b..9eab57e884 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -123,14 +123,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
boolean isMulti = sinkAction.getSink() instanceof MultiTableSink;
if (isMulti) {
sinkTables = ((MultiTableSink)
sinkAction.getSink()).getSinkTables();
- TablePath[] upstreamTablePaths =
- ((MultiTableSink) sinkAction.getSink())
- .getSinks()
- .keySet()
- .toArray(new TablePath[0]);
- for (int i = 0; i < ((MultiTableSink)
sinkAction.getSink()).getSinks().size(); i++) {
- tablesMaps.put(upstreamTablePaths[i], sinkTables.get(i));
- }
+ tablesMaps.putAll(((MultiTableSink)
sinkAction.getSink()).getSinkTableMapping());
} else {
Optional<CatalogTable> catalogTable =
sinkAction.getSink().getWriteCatalogTable();
if (catalogTable.isPresent()) {