This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d0fff0e266 [flink][core] Flink supports setting job level context to
FileIO (#5836)
d0fff0e266 is described below
commit d0fff0e26628469a3c4e563f048f5264dfaa65a8
Author: yuzelin <[email protected]>
AuthorDate: Fri Jul 4 20:46:31 2025 +0800
[flink][core] Flink supports setting job level context to FileIO (#5836)
---
.../generated/flink_connector_configuration.html | 6 ++++
.../src/main/java/org/apache/paimon/fs/FileIO.java | 3 ++
.../paimon/flink/AbstractFlinkTableFactory.java | 38 ++++++++++++++--------
.../apache/paimon/flink/FlinkConnectorOptions.java | 6 ++++
4 files changed, 39 insertions(+), 14 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 84c3f5e2c2..cbc1f2e83c 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -44,6 +44,12 @@ under the License.
<td>Long</td>
<td>Optional endInput watermark used in case of batch mode or
bounded stream.</td>
</tr>
+ <tr>
+ <td><h5>filesystem.job-level-settings.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Enable pass job level filesystem settings to table file
IO.</td>
+ </tr>
<tr>
<td><h5>lookup.async</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index 201f3418e2..aed84850e9 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -73,6 +73,9 @@ public interface FileIO extends Serializable, Closeable {
/** Configure by {@link CatalogContext}. */
void configure(CatalogContext context);
+ /** Set filesystem options at runtime. Usually used for job-level
settings. */
+ default void setRuntimeContext(Map<String, String> options) {}
+
/**
* Opens an SeekableInputStream at the indicated Path.
*
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 5cbdf1abc1..4568234894 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -36,6 +36,7 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -74,6 +75,7 @@ import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.STREAMING_READ_MODE;
import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT;
import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_BOUNDED;
@@ -191,7 +193,6 @@ public abstract class AbstractFlinkTableFactory
Table buildPaimonTable(DynamicTableFactory.Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
- Table table;
Map<String, String> dynamicOptions = getDynamicConfigOptions(context);
dynamicOptions.forEach(
@@ -225,7 +226,16 @@ public abstract class AbstractFlinkTableFactory
throw new RuntimeException(e);
}
}
- table = fileStoreTable.copyWithoutTimeTravel(newOptions);
+
+ FileStoreTable table =
fileStoreTable.copyWithoutTimeTravel(newOptions);
+
+ if
(Options.fromMap(table.options()).get(FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED)) {
+ Map<String, String> runtimeContext = getAllOptions(context);
+ table.fileIO().setRuntimeContext(runtimeContext);
+ if (table instanceof ObjectTable) {
+ ((ObjectTable)
table).objectFileIO().setRuntimeContext(runtimeContext);
+ }
+ }
// notice that the Paimon table schema must be the same with the
Flink's
Schema schema =
FlinkCatalog.fromCatalogTable(context.getCatalogTable());
@@ -292,18 +302,7 @@ public abstract class AbstractFlinkTableFactory
* @return The dynamic options of this target table.
*/
static Map<String, String>
getDynamicConfigOptions(DynamicTableFactory.Context context) {
-
- ReadableConfig config = context.getConfiguration();
-
- Map<String, String> conf;
-
- if (config instanceof Configuration) {
- conf = ((Configuration) config).toMap();
- } else if (config instanceof TableConfig) {
- conf = ((TableConfig) config).getConfiguration().toMap();
- } else {
- throw new IllegalArgumentException("Unexpected config: " +
config.getClass());
- }
+ Map<String, String> conf = getAllOptions(context);
String template =
String.format(
@@ -324,4 +323,15 @@ public abstract class AbstractFlinkTableFactory
}
return optionsFromTableConfig;
}
+
+ static Map<String, String> getAllOptions(DynamicTableFactory.Context
context) {
+ ReadableConfig config = context.getConfiguration();
+ if (config instanceof Configuration) {
+ return ((Configuration) config).toMap();
+ } else if (config instanceof TableConfig) {
+ return ((TableConfig) config).getConfiguration().toMap();
+ } else {
+ throw new IllegalArgumentException("Unexpected config: " +
config.getClass());
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e3820b2d47..831a7ac4ef 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -537,6 +537,12 @@ public class FlinkConnectorOptions {
.withDescription(
"Controls the cache memory of writer coordinator
to cache manifest files in Job Manager.");
+ public static final ConfigOption<Boolean>
FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED =
+ key("filesystem.job-level-settings.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Enable pass job level filesystem
settings to table file IO.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);