This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ec506ab892b [FLINK-38936][connectors/filesystem] Improve
OS-Independent File Path Resolution for FileSystem Connector
ec506ab892b is described below
commit ec506ab892b33be442c274f194e70b21fd487435
Author: Rion Williams <[email protected]>
AuthorDate: Sat Mar 21 03:39:23 2026 -0500
[FLINK-38936][connectors/filesystem] Improve OS-Independent File Path
Resolution for FileSystem Connector
---
.../file/table/FileSystemTableSource.java | 12 ++-
.../file/table/FileSystemTableSourceTest.java | 98 ++++++++++++++++++++++
2 files changed, 107 insertions(+), 3 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
index 4e8da8fb090..0ad7b4ab5c4 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
@@ -63,7 +63,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.Serializable;
-import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -489,6 +488,14 @@ public class FileSystemTableSource extends
AbstractFileSystemTable
Object getValue(FileSourceSplit split);
}
+ /**
+ * Extracts the file name in an OS-independent way as {@link
java.nio.file.Paths} cannot handle
+ * Windows drive-letter URIs (e.g., file:/D:/...).
+ */
+ static String extractFileName(Path path) {
+ return path.getName();
+ }
+
enum ReadableFileInfo implements Serializable {
FILEPATH(
"file.path",
@@ -509,8 +516,7 @@ public class FileSystemTableSource extends
AbstractFileSystemTable
@Override
public Object getValue(FileSourceSplit split) {
- return StringData.fromString(
-
Paths.get(split.path().getPath()).getFileName().toString());
+ return
StringData.fromString(extractFileName(split.path()));
}
}),
SIZE(
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceTest.java
index 358f5ed5776..e92fa0b3600 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceTest.java
@@ -18,13 +18,23 @@
package org.apache.flink.connector.file.table;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link FileSystemTableSource}. */
class FileSystemTableSourceTest extends TableTestBase {
@@ -80,4 +90,92 @@ class FileSystemTableSourceTest extends TableTestBase {
util.verifyRelPlanInsert(
"insert into MySink(a, b, c) select a, b, filemeta from
MyTableWithMeta");
}
+
+ @ParameterizedTest(name = "extractFileName({0}) -> {1}")
+ @MethodSource("fileNameCases")
+ void testFileNameExtraction(String rawPath, String expected) {
+ String extractedFileName = FileSystemTableSource.extractFileName(new
Path(rawPath));
+ assertThat(extractedFileName).isEqualTo(expected);
+ }
+
+ @ParameterizedTest(name = "{0} accessor for {1}")
+ @MethodSource("metadataAccessorCases")
+ void testMetadataAccessor(
+ FileSystemTableSource.ReadableFileInfo fileInfo, String rawPath,
String expected) {
+ FileSourceSplit split =
+ new FileSourceSplit("test-split", new Path(rawPath), 0L, 1L,
0L, 1L);
+
+ Object actual = fileInfo.getAccessor().getValue(split);
+
+ assertThat(actual).isEqualTo(StringData.fromString(expected));
+ }
+
+ static Stream<Arguments> fileNameCases() {
+ return Stream.of(
+ Arguments.of("file.txt", "file.txt"),
+ Arguments.of("/tmp/input/dir/file.txt", "file.txt"),
+ Arguments.of("tmp/input/dir/file.txt", "file.txt"),
+ Arguments.of("./local/file.txt", "file.txt"),
+ Arguments.of("../input/file.txt", "file.txt"),
+ Arguments.of("../../data/report.csv", "report.csv"),
+ Arguments.of("file:///tmp/input/user.csv", "user.csv"),
+ Arguments.of("file://localhost/tmp/input/user.csv",
"user.csv"),
+ Arguments.of("file:/tmp/input/dir/", "dir"),
+ Arguments.of("file:/C:/Users/me/Desktop/thing.txt",
"thing.txt"),
+ Arguments.of("file:/D:/tmp/input/test.csv", "test.csv"),
+
Arguments.of("file:/D:/AI-Book/FlinkApplication/data/input/user.csv",
"user.csv"),
+ Arguments.of("s3://bucket/a/b/c.parquet", "c.parquet"),
+ Arguments.of("hdfs://node:8020/data/output/result.parquet",
"result.parquet"),
+ Arguments.of("gs://my-bucket/path/to/data.json", "data.json"),
+ Arguments.of("/tmp/input/archive.tar.gz", "archive.tar.gz"),
+ Arguments.of("s3://bucket/data/backup.2026.01.01.csv",
"backup.2026.01.01.csv"),
+ Arguments.of("/tmp/input/spacing test.txt", "spacing
test.txt"),
+ Arguments.of("/tmp/input/report (final).csv", "report
(final).csv"),
+ Arguments.of("/tmp/input/file_name-v2.txt",
"file_name-v2.txt"),
+ Arguments.of("/tmp/input/it's a file.txt", "it's a file.txt"),
+ Arguments.of("/tmp/input/it's a \"quoted\" name.txt", "it's a
\"quoted\" name.txt"),
+ Arguments.of("file:///tmp/input/my%20file.txt",
"my%20file.txt"));
+ }
+
+ static Stream<Arguments> filePathCases() {
+ return Stream.of(
+ Arguments.of("file.txt", "file.txt"),
+ Arguments.of("/tmp/input/dir/file.txt",
"/tmp/input/dir/file.txt"),
+ Arguments.of("tmp/input/dir/file.txt",
"tmp/input/dir/file.txt"),
+ Arguments.of("./local/file.txt", "local/file.txt"),
+ Arguments.of("../input/file.txt", "../input/file.txt"),
+ Arguments.of("../../data/report.csv", "../../data/report.csv"),
+ Arguments.of("file:///tmp/input/user.csv",
"/tmp/input/user.csv"),
+ Arguments.of("file://localhost/tmp/input/user.csv",
"/tmp/input/user.csv"),
+ Arguments.of(
+ "file://localhost/tmp/weird \"quoted\"
directory/user.csv",
+ "/tmp/weird \"quoted\" directory/user.csv"),
+ Arguments.of("file:/tmp/input/dir/", "/tmp/input/dir"),
+ Arguments.of(
+ "file:/C:/Users/me/Desktop/thing.txt",
"/C:/Users/me/Desktop/thing.txt"),
+ Arguments.of("file:/D:/tmp/input/test.csv",
"/D:/tmp/input/test.csv"),
+ Arguments.of(
+
"file:/D:/AI-Book/FlinkApplication/data/input/user.csv",
+ "/D:/AI-Book/FlinkApplication/data/input/user.csv"),
+ Arguments.of("s3://bucket/a/b/c.parquet", "/a/b/c.parquet"),
+ Arguments.of(
+ "hdfs://node:8020/data/output/result.parquet",
+ "/data/output/result.parquet"),
+ Arguments.of("gs://my-bucket/path/to/data.json",
"/path/to/data.json"),
+ Arguments.of("/tmp/input/archive.tar.gz",
"/tmp/input/archive.tar.gz"),
+ Arguments.of(
+ "s3://bucket/data/backup.2026.01.01.csv",
"/data/backup.2026.01.01.csv"),
+ Arguments.of("/tmp/input/file_name-v2.txt",
"/tmp/input/file_name-v2.txt"));
+ }
+
+ static Stream<Arguments> metadataAccessorCases() {
+ return Stream.concat(
+ withInfo(FileSystemTableSource.ReadableFileInfo.FILENAME,
fileNameCases()),
+ withInfo(FileSystemTableSource.ReadableFileInfo.FILEPATH,
filePathCases()));
+ }
+
+ private static Stream<Arguments> withInfo(
+ FileSystemTableSource.ReadableFileInfo info, Stream<Arguments>
cases) {
+ return cases.map(args -> Arguments.of(info, args.get()[0],
args.get()[1]));
+ }
}