This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new 0c958e83371 [FLINK-38486] Harden shutdown of system UDFs
0c958e83371 is described below

commit 0c958e83371b3b091b95ce81baa613b45f4dfc5a
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Oct 8 13:34:33 2025 +0200

    [FLINK-38486] Harden shutdown of system UDFs
    
    If a resource is lazily created in open, we can only close after checking 
for null. Otherwise a failure during initialization will trigger secondary 
failures.
---
 .../connector/file/table/FileSystemOutputFormat.java     |  4 +++-
 .../flink/connector/file/table/FileSystemTableSink.java  |  6 ++++--
 .../file/table/batch/compact/BatchFileWriter.java        |  4 +++-
 .../flink/metrics/otel/OpenTelemetryMetricReporter.java  |  8 +++++---
 .../flink/traces/otel/OpenTelemetryTraceReporter.java    | 12 ++++++++----
 .../table/runtime/arrow/serializers/ArrowSerializer.java | 16 ++++++++++++----
 ...hArrowPythonGroupWindowAggregateFunctionOperator.java |  4 +++-
 .../runtime/functions/DefaultExpressionEvaluator.java    |  4 +++-
 .../operators/over/BufferDataOverWindowOperator.java     |  4 +++-
 9 files changed, 44 insertions(+), 18 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
index 3b73e95e497..5c58e2e8275 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
@@ -210,7 +210,9 @@ public class FileSystemOutputFormat<T>
     @Override
     public void close() throws IOException {
         try {
-            writer.close();
+            if (writer != null) {
+                writer.close();
+            }
         } catch (Exception e) {
             throw new TableException("Exception in close", e);
         }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
index 6890b939c51..c2f741f66e6 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
@@ -471,8 +471,10 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
 
             @Override
             public void close() throws IOException {
-                this.output.flush();
-                this.output.close();
+                if (output != null) {
+                    this.output.flush();
+                    this.output.close();
+                }
             }
         };
     }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
index dd8e295e49b..b71c35dbc58 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
@@ -124,7 +124,9 @@ public class BatchFileWriter<T> extends 
AbstractStreamOperator<CoordinatorInput>
     public void close() throws Exception {
         try {
             staticPartitions.clear();
-            writer.close();
+            if (writer != null) {
+                writer.close();
+            }
         } catch (Exception e) {
             throw new TableException("Exception in close", e);
         }
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
index c04be6a9bda..32799627ea0 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
@@ -100,9 +100,11 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
 
     @Override
     public void close() {
-        exporter.flush();
-        lastResult.join(1, TimeUnit.MINUTES);
-        exporter.close();
+        if (exporter != null) {
+            exporter.flush();
+            lastResult.join(1, TimeUnit.MINUTES);
+            exporter.close();
+        }
     }
 
     @Override
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
index aae2944fd2a..d60650169a0 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
@@ -70,10 +70,14 @@ public class OpenTelemetryTraceReporter extends 
OpenTelemetryReporterBase implem
 
     @Override
     public void close() {
-        spanProcessor.forceFlush();
-        spanProcessor.close();
-        spanExporter.flush();
-        spanExporter.close();
+        if (spanProcessor != null) {
+            spanProcessor.forceFlush();
+            spanProcessor.close();
+        }
+        if (spanExporter != null) {
+            spanExporter.flush();
+            spanExporter.close();
+        }
     }
 
     private void notifyOfAddedSpanInternal(Span span, 
io.opentelemetry.api.trace.Span parent) {
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
index 9a9f0cee8e2..7bf4f01bf9c 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
@@ -113,10 +113,18 @@ public final class ArrowSerializer {
     }
 
     public void close() throws Exception {
-        arrowStreamWriter.end();
-        arrowStreamReader.close();
-        rootWriter.close();
-        allocator.close();
+        if (arrowStreamWriter != null) {
+            arrowStreamWriter.end();
+        }
+        if (arrowStreamReader != null) {
+            arrowStreamReader.close();
+        }
+        if (rootWriter != null) {
+            rootWriter.close();
+        }
+        if (allocator != null) {
+            allocator.close();
+        }
     }
 
     /** Creates an {@link ArrowWriter}. */
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
index 301fd476cad..57c9b31fad4 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
@@ -129,7 +129,9 @@ public class 
BatchArrowPythonGroupWindowAggregateFunctionOperator
     @Override
     public void close() throws Exception {
         super.close();
-        windowsGrouping.close();
+        if (windowsGrouping != null) {
+            windowsGrouping.close();
+        }
     }
 
     @Override
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
index 31261d574ce..1af1cf742eb 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
@@ -76,7 +76,9 @@ public class DefaultExpressionEvaluator implements 
ExpressionEvaluator {
     @Override
     public void close() {
         try {
-            instance.close();
+            if (instance != null) {
+                instance.close();
+            }
         } catch (Exception e) {
             throw new TableException(
                     String.format(
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
index d72f286119a..c3d40663b35 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
@@ -152,6 +152,8 @@ public class BufferDataOverWindowOperator extends 
TableStreamOperator<RowData>
     @Override
     public void close() throws Exception {
         super.close();
-        this.currentData.close();
+        if (this.currentData != null) {
+            this.currentData.close();
+        }
     }
 }

Reply via email to