This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d0fff0e266 [flink][core] Flink supports setting job level context to 
FileIO  (#5836)
d0fff0e266 is described below

commit d0fff0e26628469a3c4e563f048f5264dfaa65a8
Author: yuzelin <[email protected]>
AuthorDate: Fri Jul 4 20:46:31 2025 +0800

    [flink][core] Flink supports setting job level context to FileIO  (#5836)
---
 .../generated/flink_connector_configuration.html   |  6 ++++
 .../src/main/java/org/apache/paimon/fs/FileIO.java |  3 ++
 .../paimon/flink/AbstractFlinkTableFactory.java    | 38 ++++++++++++++--------
 .../apache/paimon/flink/FlinkConnectorOptions.java |  6 ++++
 4 files changed, 39 insertions(+), 14 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 84c3f5e2c2..cbc1f2e83c 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -44,6 +44,12 @@ under the License.
             <td>Long</td>
             <td>Optional endInput watermark used in case of batch mode or 
bounded stream.</td>
         </tr>
+        <tr>
+            <td><h5>filesystem.job-level-settings.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Enable pass job level filesystem settings to table file 
IO.</td>
+        </tr>
         <tr>
             <td><h5>lookup.async</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index 201f3418e2..aed84850e9 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -73,6 +73,9 @@ public interface FileIO extends Serializable, Closeable {
     /** Configure by {@link CatalogContext}. */
     void configure(CatalogContext context);
 
+    /** Set filesystem options at runtime. Usually used for job-level 
settings. */
+    default void setRuntimeContext(Map<String, String> options) {}
+
     /**
      * Opens an SeekableInputStream at the indicated Path.
      *
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 5cbdf1abc1..4568234894 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -36,6 +36,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -74,6 +75,7 @@ import static org.apache.paimon.CoreOptions.SCAN_MODE;
 import static org.apache.paimon.CoreOptions.STREAMING_READ_MODE;
 import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT;
 import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
 import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_BOUNDED;
@@ -191,7 +193,6 @@ public abstract class AbstractFlinkTableFactory
 
     Table buildPaimonTable(DynamicTableFactory.Context context) {
         CatalogTable origin = context.getCatalogTable().getOrigin();
-        Table table;
 
         Map<String, String> dynamicOptions = getDynamicConfigOptions(context);
         dynamicOptions.forEach(
@@ -225,7 +226,16 @@ public abstract class AbstractFlinkTableFactory
                 throw new RuntimeException(e);
             }
         }
-        table = fileStoreTable.copyWithoutTimeTravel(newOptions);
+
+        FileStoreTable table = 
fileStoreTable.copyWithoutTimeTravel(newOptions);
+
+        if 
(Options.fromMap(table.options()).get(FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED)) {
+            Map<String, String> runtimeContext = getAllOptions(context);
+            table.fileIO().setRuntimeContext(runtimeContext);
+            if (table instanceof ObjectTable) {
+                ((ObjectTable) 
table).objectFileIO().setRuntimeContext(runtimeContext);
+            }
+        }
 
         // notice that the Paimon table schema must be the same with the 
Flink's
         Schema schema = 
FlinkCatalog.fromCatalogTable(context.getCatalogTable());
@@ -292,18 +302,7 @@ public abstract class AbstractFlinkTableFactory
      * @return The dynamic options of this target table.
      */
     static Map<String, String> 
getDynamicConfigOptions(DynamicTableFactory.Context context) {
-
-        ReadableConfig config = context.getConfiguration();
-
-        Map<String, String> conf;
-
-        if (config instanceof Configuration) {
-            conf = ((Configuration) config).toMap();
-        } else if (config instanceof TableConfig) {
-            conf = ((TableConfig) config).getConfiguration().toMap();
-        } else {
-            throw new IllegalArgumentException("Unexpected config: " + 
config.getClass());
-        }
+        Map<String, String> conf = getAllOptions(context);
 
         String template =
                 String.format(
@@ -324,4 +323,15 @@ public abstract class AbstractFlinkTableFactory
         }
         return optionsFromTableConfig;
     }
+
+    static Map<String, String> getAllOptions(DynamicTableFactory.Context 
context) {
+        ReadableConfig config = context.getConfiguration();
+        if (config instanceof Configuration) {
+            return ((Configuration) config).toMap();
+        } else if (config instanceof TableConfig) {
+            return ((TableConfig) config).getConfiguration().toMap();
+        } else {
+            throw new IllegalArgumentException("Unexpected config: " + 
config.getClass());
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e3820b2d47..831a7ac4ef 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -537,6 +537,12 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "Controls the cache memory of writer coordinator 
to cache manifest files in Job Manager.");
 
+    public static final ConfigOption<Boolean> 
FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED =
+            key("filesystem.job-level-settings.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Enable pass job level filesystem 
settings to table file IO.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);

Reply via email to