This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new 16a9d2fe456 [FLINK-38486] Harden shutdown of system UDFs
16a9d2fe456 is described below

commit 16a9d2fe456bd711749aa1efa3c10a617915c969
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Oct 8 13:34:33 2025 +0200

    [FLINK-38486] Harden shutdown of system UDFs
    
    If a resource is lazily created in open, we can only close after checking 
for null. Otherwise a failure during initialization will trigger secondary 
failures.
---
 .../connector/file/table/FileSystemOutputFormat.java     |  4 +++-
 .../flink/connector/file/table/FileSystemTableSink.java  |  6 ++++--
 .../file/table/batch/compact/BatchFileWriter.java        |  4 +++-
 .../table/runtime/arrow/serializers/ArrowSerializer.java | 16 ++++++++++++----
 ...hArrowPythonGroupWindowAggregateFunctionOperator.java |  4 +++-
 .../runtime/functions/DefaultExpressionEvaluator.java    |  4 +++-
 .../operators/over/BufferDataOverWindowOperator.java     |  4 +++-
 7 files changed, 31 insertions(+), 11 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
index 3b73e95e497..5c58e2e8275 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
@@ -210,7 +210,9 @@ public class FileSystemOutputFormat<T>
     @Override
     public void close() throws IOException {
         try {
-            writer.close();
+            if (writer != null) {
+                writer.close();
+            }
         } catch (Exception e) {
             throw new TableException("Exception in close", e);
         }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
index e088a51664f..3ebe370af1c 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
@@ -471,8 +471,10 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
 
             @Override
             public void close() throws IOException {
-                this.output.flush();
-                this.output.close();
+                if (output != null) {
+                    this.output.flush();
+                    this.output.close();
+                }
             }
         };
     }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
index 87996e751e1..b92067a535c 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
@@ -126,7 +126,9 @@ public class BatchFileWriter<T> extends 
AbstractStreamOperator<CoordinatorInput>
     public void close() throws Exception {
         try {
             staticPartitions.clear();
-            writer.close();
+            if (writer != null) {
+                writer.close();
+            }
         } catch (Exception e) {
             throw new TableException("Exception in close", e);
         }
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
index 9a9f0cee8e2..7bf4f01bf9c 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
@@ -113,10 +113,18 @@ public final class ArrowSerializer {
     }
 
     public void close() throws Exception {
-        arrowStreamWriter.end();
-        arrowStreamReader.close();
-        rootWriter.close();
-        allocator.close();
+        if (arrowStreamWriter != null) {
+            arrowStreamWriter.end();
+        }
+        if (arrowStreamReader != null) {
+            arrowStreamReader.close();
+        }
+        if (rootWriter != null) {
+            rootWriter.close();
+        }
+        if (allocator != null) {
+            allocator.close();
+        }
     }
 
     /** Creates an {@link ArrowWriter}. */
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
index 301fd476cad..57c9b31fad4 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
@@ -129,7 +129,9 @@ public class 
BatchArrowPythonGroupWindowAggregateFunctionOperator
     @Override
     public void close() throws Exception {
         super.close();
-        windowsGrouping.close();
+        if (windowsGrouping != null) {
+            windowsGrouping.close();
+        }
     }
 
     @Override
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
index 31261d574ce..1af1cf742eb 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
@@ -76,7 +76,9 @@ public class DefaultExpressionEvaluator implements 
ExpressionEvaluator {
     @Override
     public void close() {
         try {
-            instance.close();
+            if (instance != null) {
+                instance.close();
+            }
         } catch (Exception e) {
             throw new TableException(
                     String.format(
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
index 50301646260..5525ba42ab6 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
@@ -138,6 +138,8 @@ public class BufferDataOverWindowOperator extends 
TableStreamOperator<RowData>
     @Override
     public void close() throws Exception {
         super.close();
-        this.currentData.close();
+        if (this.currentData != null) {
+            this.currentData.close();
+        }
     }
 }

Reply via email to