This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ec506ab892b [FLINK-38936][connectors/filesystem] Improve 
OS-Independent File Path Resolution for FileSystem Connector
ec506ab892b is described below

commit ec506ab892b33be442c274f194e70b21fd487435
Author: Rion Williams <[email protected]>
AuthorDate: Sat Mar 21 03:39:23 2026 -0500

    [FLINK-38936][connectors/filesystem] Improve OS-Independent File Path 
Resolution for FileSystem Connector
---
 .../file/table/FileSystemTableSource.java          | 12 ++-
 .../file/table/FileSystemTableSourceTest.java      | 98 ++++++++++++++++++++++
 2 files changed, 107 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
index 4e8da8fb090..0ad7b4ab5c4 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
@@ -63,7 +63,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
-import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -489,6 +488,14 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
         Object getValue(FileSourceSplit split);
     }
 
+    /**
+     * Extracts the file name in an OS-independent way as {@link 
java.nio.file.Paths} cannot handle
+     * Windows drive-letter URIs (e.g., file:/D:/...).
+     */
+    static String extractFileName(Path path) {
+        return path.getName();
+    }
+
     enum ReadableFileInfo implements Serializable {
         FILEPATH(
                 "file.path",
@@ -509,8 +516,7 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
 
                     @Override
                     public Object getValue(FileSourceSplit split) {
-                        return StringData.fromString(
-                                
Paths.get(split.path().getPath()).getFileName().toString());
+                        return 
StringData.fromString(extractFileName(split.path()));
                     }
                 }),
         SIZE(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceTest.java
index 358f5ed5776..e92fa0b3600 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceTest.java
@@ -18,13 +18,23 @@
 
 package org.apache.flink.connector.file.table;
 
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.planner.utils.StreamTableTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link FileSystemTableSource}. */
 class FileSystemTableSourceTest extends TableTestBase {
@@ -80,4 +90,92 @@ class FileSystemTableSourceTest extends TableTestBase {
         util.verifyRelPlanInsert(
                 "insert into MySink(a, b, c) select a, b, filemeta from 
MyTableWithMeta");
     }
+
+    @ParameterizedTest(name = "extractFileName({0}) -> {1}")
+    @MethodSource("fileNameCases")
+    void testFileNameExtraction(String rawPath, String expected) {
+        String extractedFileName = FileSystemTableSource.extractFileName(new 
Path(rawPath));
+        assertThat(extractedFileName).isEqualTo(expected);
+    }
+
+    @ParameterizedTest(name = "{0} accessor for {1}")
+    @MethodSource("metadataAccessorCases")
+    void testMetadataAccessor(
+            FileSystemTableSource.ReadableFileInfo fileInfo, String rawPath, 
String expected) {
+        FileSourceSplit split =
+                new FileSourceSplit("test-split", new Path(rawPath), 0L, 1L, 
0L, 1L);
+
+        Object actual = fileInfo.getAccessor().getValue(split);
+
+        assertThat(actual).isEqualTo(StringData.fromString(expected));
+    }
+
+    static Stream<Arguments> fileNameCases() {
+        return Stream.of(
+                Arguments.of("file.txt", "file.txt"),
+                Arguments.of("/tmp/input/dir/file.txt", "file.txt"),
+                Arguments.of("tmp/input/dir/file.txt", "file.txt"),
+                Arguments.of("./local/file.txt", "file.txt"),
+                Arguments.of("../input/file.txt", "file.txt"),
+                Arguments.of("../../data/report.csv", "report.csv"),
+                Arguments.of("file:///tmp/input/user.csv", "user.csv"),
+                Arguments.of("file://localhost/tmp/input/user.csv", 
"user.csv"),
+                Arguments.of("file:/tmp/input/dir/", "dir"),
+                Arguments.of("file:/C:/Users/me/Desktop/thing.txt", 
"thing.txt"),
+                Arguments.of("file:/D:/tmp/input/test.csv", "test.csv"),
+                
Arguments.of("file:/D:/AI-Book/FlinkApplication/data/input/user.csv", 
"user.csv"),
+                Arguments.of("s3://bucket/a/b/c.parquet", "c.parquet"),
+                Arguments.of("hdfs://node:8020/data/output/result.parquet", 
"result.parquet"),
+                Arguments.of("gs://my-bucket/path/to/data.json", "data.json"),
+                Arguments.of("/tmp/input/archive.tar.gz", "archive.tar.gz"),
+                Arguments.of("s3://bucket/data/backup.2026.01.01.csv", 
"backup.2026.01.01.csv"),
+                Arguments.of("/tmp/input/spacing test.txt", "spacing 
test.txt"),
+                Arguments.of("/tmp/input/report (final).csv", "report 
(final).csv"),
+                Arguments.of("/tmp/input/file_name-v2.txt", 
"file_name-v2.txt"),
+                Arguments.of("/tmp/input/it's a file.txt", "it's a file.txt"),
+                Arguments.of("/tmp/input/it's a \"quoted\" name.txt", "it's a 
\"quoted\" name.txt"),
+                Arguments.of("file:///tmp/input/my%20file.txt", 
"my%20file.txt"));
+    }
+
+    static Stream<Arguments> filePathCases() {
+        return Stream.of(
+                Arguments.of("file.txt", "file.txt"),
+                Arguments.of("/tmp/input/dir/file.txt", 
"/tmp/input/dir/file.txt"),
+                Arguments.of("tmp/input/dir/file.txt", 
"tmp/input/dir/file.txt"),
+                Arguments.of("./local/file.txt", "local/file.txt"),
+                Arguments.of("../input/file.txt", "../input/file.txt"),
+                Arguments.of("../../data/report.csv", "../../data/report.csv"),
+                Arguments.of("file:///tmp/input/user.csv", 
"/tmp/input/user.csv"),
+                Arguments.of("file://localhost/tmp/input/user.csv", 
"/tmp/input/user.csv"),
+                Arguments.of(
+                        "file://localhost/tmp/weird \"quoted\" 
directory/user.csv",
+                        "/tmp/weird \"quoted\" directory/user.csv"),
+                Arguments.of("file:/tmp/input/dir/", "/tmp/input/dir"),
+                Arguments.of(
+                        "file:/C:/Users/me/Desktop/thing.txt", 
"/C:/Users/me/Desktop/thing.txt"),
+                Arguments.of("file:/D:/tmp/input/test.csv", 
"/D:/tmp/input/test.csv"),
+                Arguments.of(
+                        
"file:/D:/AI-Book/FlinkApplication/data/input/user.csv",
+                        "/D:/AI-Book/FlinkApplication/data/input/user.csv"),
+                Arguments.of("s3://bucket/a/b/c.parquet", "/a/b/c.parquet"),
+                Arguments.of(
+                        "hdfs://node:8020/data/output/result.parquet",
+                        "/data/output/result.parquet"),
+                Arguments.of("gs://my-bucket/path/to/data.json", 
"/path/to/data.json"),
+                Arguments.of("/tmp/input/archive.tar.gz", 
"/tmp/input/archive.tar.gz"),
+                Arguments.of(
+                        "s3://bucket/data/backup.2026.01.01.csv", 
"/data/backup.2026.01.01.csv"),
+                Arguments.of("/tmp/input/file_name-v2.txt", 
"/tmp/input/file_name-v2.txt"));
+    }
+
+    static Stream<Arguments> metadataAccessorCases() {
+        return Stream.concat(
+                withInfo(FileSystemTableSource.ReadableFileInfo.FILENAME, 
fileNameCases()),
+                withInfo(FileSystemTableSource.ReadableFileInfo.FILEPATH, 
filePathCases()));
+    }
+
+    private static Stream<Arguments> withInfo(
+            FileSystemTableSource.ReadableFileInfo info, Stream<Arguments> 
cases) {
+        return cases.map(args -> Arguments.of(info, args.get()[0], 
args.get()[1]));
+    }
 }

Reply via email to