This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ec8919a877 [Bug][Connector-V2][File] Fix IndexOutOfBoundsException 
when reading empty directories (#10373)
ec8919a877 is described below

commit ec8919a8776271f9a04a2355607eab63f94a8c63
Author: yzeng1618 <[email protected]>
AuthorDate: Fri Jan 23 16:13:52 2026 +0800

    [Bug][Connector-V2][File] Fix IndexOutOfBoundsException when reading empty 
directories (#10373)
    
    Co-authored-by: zengyi <[email protected]>
---
 .../file/source/reader/AbstractReadStrategy.java   | 15 +++-
 .../file/source/reader/CsvReadStrategy.java        |  7 +-
 .../file/source/reader/ExcelReadStrategy.java      |  5 +-
 .../file/source/reader/TextReadStrategy.java       |  7 +-
 .../file/source/reader/XmlReadStrategy.java        |  5 +-
 .../source/reader/AbstractReadStrategyTest.java    | 93 ++++++++++++++++++++++
 .../e2e/connector/file/hdfs/HdfsFileIT.java        | 13 +++
 .../test/resources/hdfs_empty_text_to_assert.conf  | 52 ++++++++++++
 8 files changed, 186 insertions(+), 11 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index e55edc13f2..e64eee5795 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -136,7 +136,7 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
     public void setCatalogTable(CatalogTable catalogTable) {
         this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
         this.seaTunnelRowTypeWithPartition =
-                mergePartitionTypes(fileNames.get(0), 
catalogTable.getSeaTunnelRowType());
+                mergePartitionTypes(getPathForPartitionInference(null), 
this.seaTunnelRowType);
     }
 
     boolean checkFileType(String path) {
@@ -427,6 +427,9 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
 
     protected Map<String, String> parsePartitionsByPath(String path) {
         LinkedHashMap<String, String> partitions = new LinkedHashMap<>();
+        if (StringUtils.isBlank(path)) {
+            return partitions;
+        }
         Arrays.stream(path.split("/", -1))
                 .filter(split -> split.contains("="))
                 .map(split -> split.split("=", -1))
@@ -434,6 +437,16 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
         return partitions;
     }
 
+    protected String getPathForPartitionInference(String fallbackPath) {
+        if (!fileNames.isEmpty()) {
+            return fileNames.get(0);
+        }
+        if (StringUtils.isNotBlank(fallbackPath)) {
+            return fallbackPath;
+        }
+        return sourceRootPath;
+    }
+
     protected SeaTunnelRowType mergePartitionTypes(String path, 
SeaTunnelRowType seaTunnelRowType) {
         Map<String, String> partitionsMap = parsePartitionsByPath(path);
         if (partitionsMap.isEmpty()) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
index 99e6603518..e9b027c96b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
@@ -223,7 +223,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
         this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
         this.seaTunnelRowTypeWithPartition =
-                mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+                mergePartitionTypes(getPathForPartitionInference(path), 
seaTunnelRowType);
         initFormatter();
         if (pluginConfig.hasPath(FileBaseSourceOptions.READ_COLUMNS.key())) {
             throw new FileConnectorException(
@@ -256,8 +256,9 @@ public class CsvReadStrategy extends AbstractReadStrategy {
     public void setCatalogTable(CatalogTable catalogTable) {
         SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
         this.inputCatalogTable = catalogTable;
+        String partitionPath = getPathForPartitionInference(null);
         SeaTunnelRowType userDefinedRowTypeWithPartition =
-                mergePartitionTypes(fileNames.get(0), rowType);
+                mergePartitionTypes(partitionPath, rowType);
         ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(pluginConfig);
         encoding =
                 readonlyConfig
@@ -295,7 +296,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
             }
             this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
             this.seaTunnelRowTypeWithPartition =
-                    mergePartitionTypes(fileNames.get(0), 
this.seaTunnelRowType);
+                    mergePartitionTypes(partitionPath, this.seaTunnelRowType);
         } else {
             this.seaTunnelRowType = rowType;
             this.seaTunnelRowTypeWithPartition = 
userDefinedRowTypeWithPartition;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
index 5444b16846..145a551de5 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
@@ -218,8 +218,9 @@ public class ExcelReadStrategy extends AbstractReadStrategy 
{
                     CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                     "Schema information is not set or incorrect Schema 
settings");
         }
+        String partitionPath = getPathForPartitionInference(null);
         SeaTunnelRowType userDefinedRowTypeWithPartition =
-                mergePartitionTypes(fileNames.get(0), rowType);
+                mergePartitionTypes(partitionPath, rowType);
         // column projection
         if (pluginConfig.hasPath(FileBaseSourceOptions.READ_COLUMNS.key())) {
             // get the read column index from user-defined row type
@@ -233,7 +234,7 @@ public class ExcelReadStrategy extends AbstractReadStrategy 
{
             }
             this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
             this.seaTunnelRowTypeWithPartition =
-                    mergePartitionTypes(fileNames.get(0), 
this.seaTunnelRowType);
+                    mergePartitionTypes(partitionPath, this.seaTunnelRowType);
         } else {
             this.seaTunnelRowType = rowType;
             this.seaTunnelRowTypeWithPartition = 
userDefinedRowTypeWithPartition;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index ecaca7a82f..074054c7e8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -273,7 +273,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
         this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
         this.seaTunnelRowTypeWithPartition =
-                mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+                mergePartitionTypes(getPathForPartitionInference(path), 
seaTunnelRowType);
         initFormatter();
         if (pluginConfig.hasPath(FileBaseSourceOptions.READ_COLUMNS.key())) {
             throw new FileConnectorException(
@@ -302,8 +302,9 @@ public class TextReadStrategy extends AbstractReadStrategy {
     @Override
     public void setCatalogTable(CatalogTable catalogTable) {
         SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+        String partitionPath = getPathForPartitionInference(null);
         SeaTunnelRowType userDefinedRowTypeWithPartition =
-                mergePartitionTypes(fileNames.get(0), rowType);
+                mergePartitionTypes(partitionPath, rowType);
         ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(pluginConfig);
         Optional<String> fieldDelimiterOptional =
                 
readonlyConfig.getOptional(FileBaseSourceOptions.FIELD_DELIMITER);
@@ -343,7 +344,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
             }
             this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
             this.seaTunnelRowTypeWithPartition =
-                    mergePartitionTypes(fileNames.get(0), 
this.seaTunnelRowType);
+                    mergePartitionTypes(partitionPath, this.seaTunnelRowType);
         } else {
             this.seaTunnelRowType = rowType;
             this.seaTunnelRowTypeWithPartition = 
userDefinedRowTypeWithPartition;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
index 1a9b2ba297..58002f0f54 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
@@ -186,9 +186,10 @@ public class XmlReadStrategy extends AbstractReadStrategy {
                     "Schema information is undefined or misconfigured, please 
check your configuration file.");
         }
 
+        String partitionPath = getPathForPartitionInference(null);
         if (readColumns.isEmpty()) {
             this.seaTunnelRowType = rowType;
-            this.seaTunnelRowTypeWithPartition = 
mergePartitionTypes(fileNames.get(0), rowType);
+            this.seaTunnelRowTypeWithPartition = 
mergePartitionTypes(partitionPath, rowType);
         } else {
             if (readColumns.retainAll(Arrays.asList(rowType.getFieldNames()))) 
{
                 log.warn(
@@ -205,7 +206,7 @@ public class XmlReadStrategy extends AbstractReadStrategy {
             }
             this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
             this.seaTunnelRowTypeWithPartition =
-                    mergePartitionTypes(fileNames.get(0), 
this.seaTunnelRowType);
+                    mergePartitionTypes(partitionPath, this.seaTunnelRowType);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
index d9b9c6f838..0b4e941364 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
@@ -17,6 +17,15 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.writer.ParquetReadStrategyTest;
 
 import org.apache.avro.Schema;
@@ -40,7 +49,9 @@ import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
 
@@ -227,4 +238,86 @@ public class AbstractReadStrategyTest {
             Assertions.assertTrue(result);
         }
     }
+
+    @Test
+    public void testSetCatalogTableShouldNotThrowWhenFileListIsEmpty() {
+        Config pluginConfig = 
ConfigFactory.parseMap(buildBasePluginConfigWithPartitions());
+        CatalogTable catalogTable = buildCatalogTable();
+
+        Assertions.assertAll(
+                () -> {
+                    try (ReadStrategy strategy = new TextReadStrategy()) {
+                        assertSetCatalogTableWithEmptyFileNames(
+                                strategy, pluginConfig, catalogTable);
+                    }
+                },
+                () -> {
+                    try (ReadStrategy strategy = new CsvReadStrategy()) {
+                        assertSetCatalogTableWithEmptyFileNames(
+                                strategy, pluginConfig, catalogTable);
+                    }
+                },
+                () -> {
+                    try (ReadStrategy strategy = new ExcelReadStrategy()) {
+                        assertSetCatalogTableWithEmptyFileNames(
+                                strategy, pluginConfig, catalogTable);
+                    }
+                },
+                () -> {
+                    try (ReadStrategy strategy = new XmlReadStrategy()) {
+                        assertSetCatalogTableWithEmptyFileNames(
+                                strategy, pluginConfig, catalogTable);
+                    }
+                },
+                () -> {
+                    try (ReadStrategy strategy = new JsonReadStrategy()) {
+                        assertSetCatalogTableWithEmptyFileNames(
+                                strategy, pluginConfig, catalogTable);
+                    }
+                });
+    }
+
+    @Test
+    public void testGetSeaTunnelRowTypeInfoShouldNotThrowWhenFileListIsEmpty() 
throws Exception {
+        Config pluginConfig = 
ConfigFactory.parseMap(buildBasePluginConfigWithPartitions());
+
+        try (TextReadStrategy textReadStrategy = new TextReadStrategy()) {
+            textReadStrategy.setPluginConfig(pluginConfig);
+            SeaTunnelRowType textRowType =
+                    Assertions.assertDoesNotThrow(
+                            () -> 
textReadStrategy.getSeaTunnelRowTypeInfo("/tmp/dt=2024-01-01"));
+            Assertions.assertEquals(
+                    "dt", 
textRowType.getFieldNames()[textRowType.getTotalFields() - 1]);
+        }
+
+        try (CsvReadStrategy csvReadStrategy = new CsvReadStrategy()) {
+            csvReadStrategy.setPluginConfig(pluginConfig);
+            SeaTunnelRowType csvRowType =
+                    Assertions.assertDoesNotThrow(
+                            () -> 
csvReadStrategy.getSeaTunnelRowTypeInfo("/tmp/dt=2024-01-01"));
+            Assertions.assertEquals(
+                    "dt", 
csvRowType.getFieldNames()[csvRowType.getTotalFields() - 1]);
+        }
+    }
+
+    private static Map<String, Object> buildBasePluginConfigWithPartitions() {
+        Map<String, Object> config = new HashMap<>();
+        config.put(FileBaseSourceOptions.FILE_PATH.key(), 
"/tmp/dt=2024-01-01");
+        return config;
+    }
+
+    private static CatalogTable buildCatalogTable() {
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.INT_TYPE});
+        return CatalogTableUtil.getCatalogTable("test", rowType);
+    }
+
+    private static void assertSetCatalogTableWithEmptyFileNames(
+            ReadStrategy readStrategy, Config pluginConfig, CatalogTable 
catalogTable) {
+        readStrategy.setPluginConfig(pluginConfig);
+        Assertions.assertDoesNotThrow(() -> 
readStrategy.setCatalogTable(catalogTable));
+        SeaTunnelRowType actualRowType = 
readStrategy.getActualSeaTunnelRowTypeInfo();
+        Assertions.assertArrayEquals(new String[] {"id", "dt"}, 
actualRowType.getFieldNames());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
index fde61f1f3a..015ec79690 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
@@ -129,6 +129,19 @@ public class HdfsFileIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, readResult.getExitCode());
     }
 
+    @TestTemplate
+    public void testHdfsReadEmptyTextDirectory(TestContainer container)
+            throws IOException, InterruptedException {
+        nameNode.execInContainer("bash", "-c", "hdfs dfs -rm -r -f /empty/text 
|| true");
+        org.testcontainers.containers.Container.ExecResult mkdirResult =
+                nameNode.execInContainer("hdfs", "dfs", "-mkdir", "-p", 
"/empty/text");
+        Assertions.assertEquals(0, mkdirResult.getExitCode());
+
+        org.testcontainers.containers.Container.ExecResult readResult =
+                container.executeJob("/hdfs_empty_text_to_assert.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
+    }
+
     @TestTemplate
     public void testHdfsBinaryUpdateModeDistcp(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_empty_text_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_empty_text_to_assert.conf
new file mode 100644
index 0000000000..34772c19ee
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_empty_text_to_assert.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  HdfsFile {
+    fs.defaultFS = "hdfs://namenode1:9000"
+    path = "/empty/text"
+    file_format_type = "text"
+    schema = {
+      fields {
+        id = int
+        name = string
+      }
+    }
+    hadoop_conf = {
+      "dfs.replication" = 1
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 0
+        }
+      ]
+    }
+  }
+}
+

Reply via email to