This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push:
new 0c958e83371 [FLINK-38486] Harden shutdown of system UDFs
0c958e83371 is described below
commit 0c958e83371b3b091b95ce81baa613b45f4dfc5a
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Oct 8 13:34:33 2025 +0200
[FLINK-38486] Harden shutdown of system UDFs
If a resource is lazily created in open, we can only close after checking
for null. Otherwise a failure during initialization will trigger secondary
failures.
---
.../connector/file/table/FileSystemOutputFormat.java | 4 +++-
.../flink/connector/file/table/FileSystemTableSink.java | 6 ++++--
.../file/table/batch/compact/BatchFileWriter.java | 4 +++-
.../flink/metrics/otel/OpenTelemetryMetricReporter.java | 8 +++++---
.../flink/traces/otel/OpenTelemetryTraceReporter.java | 12 ++++++++----
.../table/runtime/arrow/serializers/ArrowSerializer.java | 16 ++++++++++++----
...hArrowPythonGroupWindowAggregateFunctionOperator.java | 4 +++-
.../runtime/functions/DefaultExpressionEvaluator.java | 4 +++-
.../operators/over/BufferDataOverWindowOperator.java | 4 +++-
9 files changed, 44 insertions(+), 18 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
index 3b73e95e497..5c58e2e8275 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
@@ -210,7 +210,9 @@ public class FileSystemOutputFormat<T>
@Override
public void close() throws IOException {
try {
- writer.close();
+ if (writer != null) {
+ writer.close();
+ }
} catch (Exception e) {
throw new TableException("Exception in close", e);
}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
index 6890b939c51..c2f741f66e6 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
@@ -471,8 +471,10 @@ public class FileSystemTableSink extends
AbstractFileSystemTable
@Override
public void close() throws IOException {
- this.output.flush();
- this.output.close();
+ if (output != null) {
+ this.output.flush();
+ this.output.close();
+ }
}
};
}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
index dd8e295e49b..b71c35dbc58 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
@@ -124,7 +124,9 @@ public class BatchFileWriter<T> extends
AbstractStreamOperator<CoordinatorInput>
public void close() throws Exception {
try {
staticPartitions.clear();
- writer.close();
+ if (writer != null) {
+ writer.close();
+ }
} catch (Exception e) {
throw new TableException("Exception in close", e);
}
diff --git
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
index c04be6a9bda..32799627ea0 100644
---
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
+++
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
@@ -100,9 +100,11 @@ public class OpenTelemetryMetricReporter extends
OpenTelemetryReporterBase
@Override
public void close() {
- exporter.flush();
- lastResult.join(1, TimeUnit.MINUTES);
- exporter.close();
+ if (exporter != null) {
+ exporter.flush();
+ lastResult.join(1, TimeUnit.MINUTES);
+ exporter.close();
+ }
}
@Override
diff --git
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
index aae2944fd2a..d60650169a0 100644
---
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
+++
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
@@ -70,10 +70,14 @@ public class OpenTelemetryTraceReporter extends
OpenTelemetryReporterBase implem
@Override
public void close() {
- spanProcessor.forceFlush();
- spanProcessor.close();
- spanExporter.flush();
- spanExporter.close();
+ if (spanProcessor != null) {
+ spanProcessor.forceFlush();
+ spanProcessor.close();
+ }
+ if (spanExporter != null) {
+ spanExporter.flush();
+ spanExporter.close();
+ }
}
private void notifyOfAddedSpanInternal(Span span,
io.opentelemetry.api.trace.Span parent) {
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
index 9a9f0cee8e2..7bf4f01bf9c 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
@@ -113,10 +113,18 @@ public final class ArrowSerializer {
}
public void close() throws Exception {
- arrowStreamWriter.end();
- arrowStreamReader.close();
- rootWriter.close();
- allocator.close();
+ if (arrowStreamWriter != null) {
+ arrowStreamWriter.end();
+ }
+ if (arrowStreamReader != null) {
+ arrowStreamReader.close();
+ }
+ if (rootWriter != null) {
+ rootWriter.close();
+ }
+ if (allocator != null) {
+ allocator.close();
+ }
}
/** Creates an {@link ArrowWriter}. */
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
index 301fd476cad..57c9b31fad4 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
@@ -129,7 +129,9 @@ public class
BatchArrowPythonGroupWindowAggregateFunctionOperator
@Override
public void close() throws Exception {
super.close();
- windowsGrouping.close();
+ if (windowsGrouping != null) {
+ windowsGrouping.close();
+ }
}
@Override
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
index 31261d574ce..1af1cf742eb 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java
@@ -76,7 +76,9 @@ public class DefaultExpressionEvaluator implements
ExpressionEvaluator {
@Override
public void close() {
try {
- instance.close();
+ if (instance != null) {
+ instance.close();
+ }
} catch (Exception e) {
throw new TableException(
String.format(
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
index d72f286119a..c3d40663b35 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java
@@ -152,6 +152,8 @@ public class BufferDataOverWindowOperator extends
TableStreamOperator<RowData>
@Override
public void close() throws Exception {
super.close();
- this.currentData.close();
+ if (this.currentData != null) {
+ this.currentData.close();
+ }
}
}