This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push:
new 16a9d2fe456 [FLINK-38486] Harden shutdown of system UDFs
16a9d2fe456 is described below
commit 16a9d2fe456bd711749aa1efa3c10a617915c969
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Oct 8 13:34:33 2025 +0200
[FLINK-38486] Harden shutdown of system UDFs
If a resource is lazily created in open, we can only close after checking
for null. Otherwise a failure during initialization will trigger secondary
failures.
---
.../connector/file/table/FileSystemOutputFormat.java | 4 +++-
.../flink/connector/file/table/FileSystemTableSink.java | 6 ++++--
.../file/table/batch/compact/BatchFileWriter.java | 4 +++-
.../table/runtime/arrow/serializers/ArrowSerializer.java | 16 ++++++++++++----
...hArrowPythonGroupWindowAggregateFunctionOperator.java | 4 +++-
.../runtime/functions/DefaultExpressionEvaluator.java | 4 +++-
.../operators/over/BufferDataOverWindowOperator.java | 4 +++-
7 files changed, 31 insertions(+), 11 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
index 3b73e95e497..5c58e2e8275 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
@@ -210,7 +210,9 @@ public class FileSystemOutputFormat<T>
@Override
public void close() throws IOException {
try {
- writer.close();
+ if (writer != null) {
+ writer.close();
+ }
} catch (Exception e) {
throw new TableException("Exception in close", e);
}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
index e088a51664f..3ebe370af1c 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
@@ -471,8 +471,10 @@ public class FileSystemTableSink extends
AbstractFileSystemTable
@Override
public void close() throws IOException {
- this.output.flush();
- this.output.close();
+ if (output != null) {
+ this.output.flush();
+ this.output.close();
+ }
}
};
}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
index 87996e751e1..b92067a535c 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
@@ -126,7 +126,9 @@ public class BatchFileWriter<T> extends
AbstractStreamOperator<CoordinatorInput>
public void close() throws Exception {
try {
staticPartitions.clear();
- writer.close();
+ if (writer != null) {
+ writer.close();
+ }
} catch (Exception e) {
throw new TableException("Exception in close", e);
}
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
index 9a9f0cee8e2..7bf4f01bf9c 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
@@ -113,10 +113,18 @@ public final class ArrowSerializer {
}
public void close() throws Exception {
- arrowStreamWriter.end();
- arrowStreamReader.close();
- rootWriter.close();
- allocator.close();
+ if (arrowStreamWriter != null) {
+ arrowStreamWriter.end();
+ }
+ if (arrowStreamReader != null) {
+ arrowStreamReader.close();
+ }
+ if (rootWriter != null) {
+ rootWriter.close();
+ }
+ if (allocator != null) {
+ allocator.close();
+ }
}
/** Creates an {@link ArrowWriter}. */
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
index 301fd476cad..57c9b31fad4 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
@@ -129,7 +129,9 @@ public class
BatchArrowPythonGroupWindowAggregateFunctionOperator
@Override
public void close() throws Exception {
super.close();
- windowsGrouping.close();
+ if (windowsGrouping != null) {
+ windowsGrouping.close();
+ }
}
@Override
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
index 31261d574ce..1af1cf742eb 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
@@ -76,7 +76,9 @@ public class DefaultExpressionEvaluator implements
ExpressionEvaluator {
@Override
public void close() {
try {
- instance.close();
+ if (instance != null) {
+ instance.close();
+ }
} catch (Exception e) {
throw new TableException(
String.format(
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
index 50301646260..5525ba42ab6 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
@@ -138,6 +138,8 @@ public class BufferDataOverWindowOperator extends
TableStreamOperator<RowData>
@Override
public void close() throws Exception {
super.close();
- this.currentData.close();
+ if (this.currentData != null) {
+ this.currentData.close();
+ }
}
}