This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 02c7eb3177 [Fix][connector-hive] Fix the file_name_expression does not 
take effect in Hive sink. (#9823)
02c7eb3177 is described below

commit 02c7eb3177989bcd50ba6c1059862c1586d3fa39
Author: Adam Wang <[email protected]>
AuthorDate: Tue Sep 16 22:29:42 2025 +0800

    [Fix][connector-hive] Fix the file_name_expression does not take effect in 
Hive sink. (#9823)
    
    Co-authored-by: wangxiaogang <[email protected]>
---
 .../connectors/seatunnel/hive/sink/HiveSink.java   |  10 +-
 .../seatunnel/hive/sink/HiveSinkConfigTest.java    | 172 +++++++++++++++++++++
 2 files changed, 179 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index a7c6e75a85..3862e2e06c 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -143,9 +143,6 @@ public class HiveSink
                         .withValue(
                                 IS_PARTITION_FIELD_WRITE_IN_FILE.key(),
                                 ConfigValueFactory.fromAnyRef(false))
-                        .withValue(
-                                FILE_NAME_EXPRESSION.key(),
-                                
ConfigValueFactory.fromAnyRef("${transactionId}"))
                         .withValue(
                                 FILE_PATH.key(),
                                 ConfigValueFactory.fromAnyRef(
@@ -153,6 +150,13 @@ public class HiveSink
                         .withValue(SINK_COLUMNS.key(), 
ConfigValueFactory.fromAnyRef(sinkFields))
                         .withValue(
                                 PARTITION_BY.key(), 
ConfigValueFactory.fromAnyRef(partitionKeys));
+        // Only set a default file_name_expression when it's not provided by 
user config.
+        if (!pluginConfig.hasPath(FILE_NAME_EXPRESSION.key())) {
+            pluginConfig =
+                    pluginConfig.withValue(
+                            FILE_NAME_EXPRESSION.key(),
+                            ConfigValueFactory.fromAnyRef("${transactionId}"));
+        }
 
         return new FileSinkConfig(pluginConfig, 
catalogTable.getSeaTunnelRowType());
     }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfigTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfigTest.java
new file mode 100644
index 0000000000..e8c9ef8bcb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfigTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Unit tests for HiveSink config generation focusing on file_name_expression 
handling. */
+public class HiveSinkConfigTest {
+
+    @Test
+    void testDefaultFileNameExpressionAppliedWhenAbsent() throws Exception {
+        // Build minimal input config without file_name_expression
+        Map<String, Object> options = new HashMap<>();
+        options.put(HiveOptions.TABLE_NAME.key(), "default.test_table");
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(options);
+
+        // Mock Hive table metadata and file format
+        try (MockedStatic<HiveTableUtils> mockedStatic = 
Mockito.mockStatic(HiveTableUtils.class)) {
+            Table table =
+                    mockTextTable(
+                            "default",
+                            "test_table",
+                            "file:/tmp/hive/test_table",
+                            listOf(
+                                    new FieldSchema("id", "string", null),
+                                    new FieldSchema("name", "string", null)),
+                            new ArrayList<>());
+            mockedStatic.when(() -> 
HiveTableUtils.getTableInfo(Mockito.any())).thenReturn(table);
+            mockedStatic
+                    .when(() -> 
HiveTableUtils.parseFileFormat(Mockito.any(Table.class)))
+                    .thenCallRealMethod(); // inputFormat set in table, real 
method will return TEXT
+
+            CatalogTable catalogTable = buildCatalogTable();
+            HiveSink hiveSink = new HiveSink(readonlyConfig, catalogTable);
+            FileSinkConfig cfg = extractFileSinkConfig(hiveSink);
+            Assertions.assertEquals(
+                    FileBaseSinkOptions.DEFAULT_FILE_NAME_EXPRESSION,
+                    cfg.getFileNameExpression(),
+                    "Should apply default ${transactionId} when user didn't 
configure file_name_expression");
+        }
+    }
+
+    @Test
+    void testRespectUserProvidedFileNameExpression() throws Exception {
+        // Provide custom file_name_expression and disable transaction to pass 
validation
+        Map<String, Object> options = new HashMap<>();
+        options.put(HiveOptions.TABLE_NAME.key(), "default.test_table");
+        options.put(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key(), 
"orders_${uuid}");
+        options.put(FileBaseSinkOptions.IS_ENABLE_TRANSACTION.key(), false);
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(options);
+
+        try (MockedStatic<HiveTableUtils> mockedStatic = 
Mockito.mockStatic(HiveTableUtils.class)) {
+            Table table =
+                    mockTextTable(
+                            "default",
+                            "test_table",
+                            "file:/tmp/hive/test_table",
+                            listOf(new FieldSchema("id", "string", null)),
+                            new ArrayList<>());
+            mockedStatic.when(() -> 
HiveTableUtils.getTableInfo(Mockito.any())).thenReturn(table);
+            mockedStatic
+                    .when(() -> 
HiveTableUtils.parseFileFormat(Mockito.any(Table.class)))
+                    .thenCallRealMethod();
+
+            CatalogTable catalogTable = buildCatalogTable();
+            HiveSink hiveSink = new HiveSink(readonlyConfig, catalogTable);
+            FileSinkConfig cfg = extractFileSinkConfig(hiveSink);
+            Assertions.assertEquals(
+                    "orders_${uuid}",
+                    cfg.getFileNameExpression(),
+                    "HiveSink should not override user-provided 
file_name_expression");
+        }
+    }
+
+    private static CatalogTable buildCatalogTable() {
+        TableSchema schema =
+                TableSchema.builder()
+                        .column(
+                                PhysicalColumn.of(
+                                        "id", BasicType.STRING_TYPE, 100L, 
true, null, null))
+                        .column(
+                                PhysicalColumn.of(
+                                        "name", BasicType.STRING_TYPE, 100L, 
true, null, null))
+                        .build();
+        return CatalogTable.of(
+                TableIdentifier.of("test_catalog", "default", "test_table"),
+                schema,
+                new HashMap<>(),
+                new ArrayList<>(),
+                "");
+    }
+
+    private static FileSinkConfig extractFileSinkConfig(HiveSink hiveSink) 
throws Exception {
+        Field f = HiveSink.class.getDeclaredField("fileSinkConfig");
+        f.setAccessible(true);
+        return (FileSinkConfig) f.get(hiveSink);
+    }
+
+    private static List<FieldSchema> listOf(FieldSchema... fs) {
+        List<FieldSchema> l = new ArrayList<>();
+        for (FieldSchema f : fs) {
+            l.add(f);
+        }
+        return l;
+    }
+
+    private static Table mockTextTable(
+            String db,
+            String tableName,
+            String location,
+            List<FieldSchema> cols,
+            List<FieldSchema> partitions) {
+        Table t = new Table();
+        t.setDbName(db);
+        t.setTableName(tableName);
+
+        SerDeInfo serDeInfo = new SerDeInfo();
+        Map<String, String> params = new HashMap<>();
+        params.put("field.delim", ",");
+        params.put("line.delim", "\n");
+        serDeInfo.setParameters(params);
+
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setSerdeInfo(serDeInfo);
+        sd.setCols(cols);
+        sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
+        sd.setLocation(location);
+        t.setSd(sd);
+        t.setPartitionKeys(partitions);
+        return t;
+    }
+}

Reply via email to