This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ec8919a877 [Bug][Connector-V2][File] Fix IndexOutOfBoundsException
when reading empty directories (#10373)
ec8919a877 is described below
commit ec8919a8776271f9a04a2355607eab63f94a8c63
Author: yzeng1618 <[email protected]>
AuthorDate: Fri Jan 23 16:13:52 2026 +0800
[Bug][Connector-V2][File] Fix IndexOutOfBoundsException when reading empty
directories (#10373)
Co-authored-by: zengyi <[email protected]>
---
.../file/source/reader/AbstractReadStrategy.java | 15 +++-
.../file/source/reader/CsvReadStrategy.java | 7 +-
.../file/source/reader/ExcelReadStrategy.java | 5 +-
.../file/source/reader/TextReadStrategy.java | 7 +-
.../file/source/reader/XmlReadStrategy.java | 5 +-
.../source/reader/AbstractReadStrategyTest.java | 93 ++++++++++++++++++++++
.../e2e/connector/file/hdfs/HdfsFileIT.java | 13 +++
.../test/resources/hdfs_empty_text_to_assert.conf | 52 ++++++++++++
8 files changed, 186 insertions(+), 11 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index e55edc13f2..e64eee5795 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -136,7 +136,7 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
public void setCatalogTable(CatalogTable catalogTable) {
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
this.seaTunnelRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0),
catalogTable.getSeaTunnelRowType());
+ mergePartitionTypes(getPathForPartitionInference(null),
this.seaTunnelRowType);
}
boolean checkFileType(String path) {
@@ -427,6 +427,9 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
protected Map<String, String> parsePartitionsByPath(String path) {
LinkedHashMap<String, String> partitions = new LinkedHashMap<>();
+ if (StringUtils.isBlank(path)) {
+ return partitions;
+ }
Arrays.stream(path.split("/", -1))
.filter(split -> split.contains("="))
.map(split -> split.split("=", -1))
@@ -434,6 +437,16 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
return partitions;
}
+ protected String getPathForPartitionInference(String fallbackPath) {
+ if (!fileNames.isEmpty()) {
+ return fileNames.get(0);
+ }
+ if (StringUtils.isNotBlank(fallbackPath)) {
+ return fallbackPath;
+ }
+ return sourceRootPath;
+ }
+
protected SeaTunnelRowType mergePartitionTypes(String path,
SeaTunnelRowType seaTunnelRowType) {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
if (partitionsMap.isEmpty()) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
index 99e6603518..e9b027c96b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
@@ -223,7 +223,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
this.seaTunnelRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+ mergePartitionTypes(getPathForPartitionInference(path),
seaTunnelRowType);
initFormatter();
if (pluginConfig.hasPath(FileBaseSourceOptions.READ_COLUMNS.key())) {
throw new FileConnectorException(
@@ -256,8 +256,9 @@ public class CsvReadStrategy extends AbstractReadStrategy {
public void setCatalogTable(CatalogTable catalogTable) {
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
this.inputCatalogTable = catalogTable;
+ String partitionPath = getPathForPartitionInference(null);
SeaTunnelRowType userDefinedRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0), rowType);
+ mergePartitionTypes(partitionPath, rowType);
ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(pluginConfig);
encoding =
readonlyConfig
@@ -295,7 +296,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
}
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
this.seaTunnelRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0),
this.seaTunnelRowType);
+ mergePartitionTypes(partitionPath, this.seaTunnelRowType);
} else {
this.seaTunnelRowType = rowType;
this.seaTunnelRowTypeWithPartition =
userDefinedRowTypeWithPartition;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
index 5444b16846..145a551de5 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
@@ -218,8 +218,9 @@ public class ExcelReadStrategy extends AbstractReadStrategy
{
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
"Schema information is not set or incorrect Schema
settings");
}
+ String partitionPath = getPathForPartitionInference(null);
SeaTunnelRowType userDefinedRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0), rowType);
+ mergePartitionTypes(partitionPath, rowType);
// column projection
if (pluginConfig.hasPath(FileBaseSourceOptions.READ_COLUMNS.key())) {
// get the read column index from user-defined row type
@@ -233,7 +234,7 @@ public class ExcelReadStrategy extends AbstractReadStrategy
{
}
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
this.seaTunnelRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0),
this.seaTunnelRowType);
+ mergePartitionTypes(partitionPath, this.seaTunnelRowType);
} else {
this.seaTunnelRowType = rowType;
this.seaTunnelRowTypeWithPartition =
userDefinedRowTypeWithPartition;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index ecaca7a82f..074054c7e8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -273,7 +273,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
this.seaTunnelRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+ mergePartitionTypes(getPathForPartitionInference(path),
seaTunnelRowType);
initFormatter();
if (pluginConfig.hasPath(FileBaseSourceOptions.READ_COLUMNS.key())) {
throw new FileConnectorException(
@@ -302,8 +302,9 @@ public class TextReadStrategy extends AbstractReadStrategy {
@Override
public void setCatalogTable(CatalogTable catalogTable) {
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+ String partitionPath = getPathForPartitionInference(null);
SeaTunnelRowType userDefinedRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0), rowType);
+ mergePartitionTypes(partitionPath, rowType);
ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(pluginConfig);
Optional<String> fieldDelimiterOptional =
readonlyConfig.getOptional(FileBaseSourceOptions.FIELD_DELIMITER);
@@ -343,7 +344,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
}
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
this.seaTunnelRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0),
this.seaTunnelRowType);
+ mergePartitionTypes(partitionPath, this.seaTunnelRowType);
} else {
this.seaTunnelRowType = rowType;
this.seaTunnelRowTypeWithPartition =
userDefinedRowTypeWithPartition;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
index 1a9b2ba297..58002f0f54 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
@@ -186,9 +186,10 @@ public class XmlReadStrategy extends AbstractReadStrategy {
"Schema information is undefined or misconfigured, please
check your configuration file.");
}
+ String partitionPath = getPathForPartitionInference(null);
if (readColumns.isEmpty()) {
this.seaTunnelRowType = rowType;
- this.seaTunnelRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), rowType);
+ this.seaTunnelRowTypeWithPartition =
mergePartitionTypes(partitionPath, rowType);
} else {
if (readColumns.retainAll(Arrays.asList(rowType.getFieldNames())))
{
log.warn(
@@ -205,7 +206,7 @@ public class XmlReadStrategy extends AbstractReadStrategy {
}
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
this.seaTunnelRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0),
this.seaTunnelRowType);
+ mergePartitionTypes(partitionPath, this.seaTunnelRowType);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
index d9b9c6f838..0b4e941364 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
@@ -17,6 +17,15 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.writer.ParquetReadStrategyTest;
import org.apache.avro.Schema;
@@ -40,7 +49,9 @@ import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
@@ -227,4 +238,86 @@ public class AbstractReadStrategyTest {
Assertions.assertTrue(result);
}
}
+
+ @Test
+ public void testSetCatalogTableShouldNotThrowWhenFileListIsEmpty() {
+ Config pluginConfig =
ConfigFactory.parseMap(buildBasePluginConfigWithPartitions());
+ CatalogTable catalogTable = buildCatalogTable();
+
+ Assertions.assertAll(
+ () -> {
+ try (ReadStrategy strategy = new TextReadStrategy()) {
+ assertSetCatalogTableWithEmptyFileNames(
+ strategy, pluginConfig, catalogTable);
+ }
+ },
+ () -> {
+ try (ReadStrategy strategy = new CsvReadStrategy()) {
+ assertSetCatalogTableWithEmptyFileNames(
+ strategy, pluginConfig, catalogTable);
+ }
+ },
+ () -> {
+ try (ReadStrategy strategy = new ExcelReadStrategy()) {
+ assertSetCatalogTableWithEmptyFileNames(
+ strategy, pluginConfig, catalogTable);
+ }
+ },
+ () -> {
+ try (ReadStrategy strategy = new XmlReadStrategy()) {
+ assertSetCatalogTableWithEmptyFileNames(
+ strategy, pluginConfig, catalogTable);
+ }
+ },
+ () -> {
+ try (ReadStrategy strategy = new JsonReadStrategy()) {
+ assertSetCatalogTableWithEmptyFileNames(
+ strategy, pluginConfig, catalogTable);
+ }
+ });
+ }
+
+ @Test
+ public void testGetSeaTunnelRowTypeInfoShouldNotThrowWhenFileListIsEmpty()
throws Exception {
+ Config pluginConfig =
ConfigFactory.parseMap(buildBasePluginConfigWithPartitions());
+
+ try (TextReadStrategy textReadStrategy = new TextReadStrategy()) {
+ textReadStrategy.setPluginConfig(pluginConfig);
+ SeaTunnelRowType textRowType =
+ Assertions.assertDoesNotThrow(
+ () ->
textReadStrategy.getSeaTunnelRowTypeInfo("/tmp/dt=2024-01-01"));
+ Assertions.assertEquals(
+ "dt",
textRowType.getFieldNames()[textRowType.getTotalFields() - 1]);
+ }
+
+ try (CsvReadStrategy csvReadStrategy = new CsvReadStrategy()) {
+ csvReadStrategy.setPluginConfig(pluginConfig);
+ SeaTunnelRowType csvRowType =
+ Assertions.assertDoesNotThrow(
+ () ->
csvReadStrategy.getSeaTunnelRowTypeInfo("/tmp/dt=2024-01-01"));
+ Assertions.assertEquals(
+ "dt",
csvRowType.getFieldNames()[csvRowType.getTotalFields() - 1]);
+ }
+ }
+
+ private static Map<String, Object> buildBasePluginConfigWithPartitions() {
+ Map<String, Object> config = new HashMap<>();
+ config.put(FileBaseSourceOptions.FILE_PATH.key(),
"/tmp/dt=2024-01-01");
+ return config;
+ }
+
+ private static CatalogTable buildCatalogTable() {
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.INT_TYPE});
+ return CatalogTableUtil.getCatalogTable("test", rowType);
+ }
+
+ private static void assertSetCatalogTableWithEmptyFileNames(
+ ReadStrategy readStrategy, Config pluginConfig, CatalogTable
catalogTable) {
+ readStrategy.setPluginConfig(pluginConfig);
+ Assertions.assertDoesNotThrow(() ->
readStrategy.setCatalogTable(catalogTable));
+ SeaTunnelRowType actualRowType =
readStrategy.getActualSeaTunnelRowTypeInfo();
+ Assertions.assertArrayEquals(new String[] {"id", "dt"},
actualRowType.getFieldNames());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
index fde61f1f3a..015ec79690 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
@@ -129,6 +129,19 @@ public class HdfsFileIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, readResult.getExitCode());
}
+ @TestTemplate
+ public void testHdfsReadEmptyTextDirectory(TestContainer container)
+ throws IOException, InterruptedException {
+ nameNode.execInContainer("bash", "-c", "hdfs dfs -rm -r -f /empty/text
|| true");
+ org.testcontainers.containers.Container.ExecResult mkdirResult =
+ nameNode.execInContainer("hdfs", "dfs", "-mkdir", "-p",
"/empty/text");
+ Assertions.assertEquals(0, mkdirResult.getExitCode());
+
+ org.testcontainers.containers.Container.ExecResult readResult =
+ container.executeJob("/hdfs_empty_text_to_assert.conf");
+ Assertions.assertEquals(0, readResult.getExitCode());
+ }
+
@TestTemplate
public void testHdfsBinaryUpdateModeDistcp(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_empty_text_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_empty_text_to_assert.conf
new file mode 100644
index 0000000000..34772c19ee
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_empty_text_to_assert.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ HdfsFile {
+ fs.defaultFS = "hdfs://namenode1:9000"
+ path = "/empty/text"
+ file_format_type = "text"
+ schema = {
+ fields {
+ id = int
+ name = string
+ }
+ }
+ hadoop_conf = {
+ "dfs.replication" = 1
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 0
+ }
+ ]
+ }
+ }
+}
+