This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5f4e944378 [Bug][Connector-V2][Email] Fix NPE on null values, add 
configurable attachment name and delimiter (#10112)
5f4e944378 is described below

commit 5f4e944378815f751e026f7af319e456d850b566
Author: zhan7236 <[email protected]>
AuthorDate: Wed Nov 26 22:43:08 2025 +0800

    [Bug][Connector-V2][Email] Fix NPE on null values, add configurable 
attachment name and delimiter (#10112)
---
 docs/en/connector-v2/sink/Email.md                 | 12 +++
 docs/zh/connector-v2/sink/Email.md                 | 22 +++--
 .../seatunnel/email/config/EmailSinkConfig.java    |  6 ++
 .../seatunnel/email/config/EmailSinkOptions.java   | 13 +++
 .../seatunnel/email/sink/EmailSinkWriter.java      | 27 ++++--
 .../seatunnel/email/EmailSinkWriterTest.java       | 98 ++++++++++++++++++++++
 6 files changed, 168 insertions(+), 10 deletions(-)

diff --git a/docs/en/connector-v2/sink/Email.md 
b/docs/en/connector-v2/sink/Email.md
index ea8f04ce00..6d603ee6c9 100644
--- a/docs/en/connector-v2/sink/Email.md
+++ b/docs/en/connector-v2/sink/Email.md
@@ -27,6 +27,8 @@ The tested email version is 1.5.6.
 | email_authorization_code | string  | no       | -             |
 | email_message_headline   | string  | yes      | -             |
 | email_message_content    | string  | yes      | -             |
+| email_attachment_name    | string  | no       | emailsink.csv |
+| email_field_delimiter    | string  | no       | ,             |
 | common-options           |         | no       | -             |
 
 ### email_from_address [string]
@@ -65,6 +67,14 @@ The subject line of the entire message.
 
 The body of the entire message.
 
+### email_attachment_name [string]
+
+The name of the email attachment file. Default is `emailsink.csv`.
+
+### email_field_delimiter [string]
+
+The delimiter used to separate fields in the attachment file. Default is comma 
`,`.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](../sink-common-options.md) for details.
@@ -82,6 +92,8 @@ Sink plugin common parameters, please refer to [Sink Common 
Options](../sink-com
       email_authorization_code=""
       email_message_headline=""
       email_message_content=""
+      email_attachment_name="report.csv"  # Optional, default is emailsink.csv
+      email_field_delimiter="|"           # Optional, default is ,
    }
 
 ```
diff --git a/docs/zh/connector-v2/sink/Email.md 
b/docs/zh/connector-v2/sink/Email.md
index 472cf6c8e3..261cb67df9 100644
--- a/docs/zh/connector-v2/sink/Email.md
+++ b/docs/zh/connector-v2/sink/Email.md
@@ -25,11 +25,13 @@ import ChangeLog from '../changelog/connector-email.md';
 | email_host               | string  | 是    | -   |
 | email_transport_protocol | string  | 是    | -   |
 | email_smtp_auth          | boolean | 是    | -   |
-| email_smtp_port          | int     | 否    | 465 |
-| email_authorization_code | string  | 否    | -   |
-| email_message_headline   | string  | 是    | -   |
-| email_message_content    | string  | 是    | -   |
-| common-options           |         | 否    | -   |
+| email_smtp_port          | int     | 否    | 465           |
+| email_authorization_code | string  | 否    | -             |
+| email_message_headline   | string  | 是    | -             |
+| email_message_content    | string  | 是    | -             |
+| email_attachment_name    | string  | 否    | emailsink.csv |
+| email_field_delimiter    | string  | 否    | ,             |
+| common-options           |         | 否    | -             |
 
 ### email_from_address [string]
 
@@ -67,6 +69,14 @@ import ChangeLog from '../changelog/connector-email.md';
 
 邮件消息的正文
 
+### email_attachment_name [string]
+
+邮件附件的文件名。默认为 `emailsink.csv`。
+
+### email_field_delimiter [string]
+
+附件文件中用于分隔字段的分隔符。默认为逗号 `,`。
+
 ### common options
 
 Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md) 了解详情.
@@ -84,6 +94,8 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
       email_authorization_code=""
       email_message_headline=""
       email_message_content=""
+      email_attachment_name="report.csv"  # 可选,默认为 emailsink.csv
+      email_field_delimiter="|"           # 可选,默认为 ,
    }
 
 ```
diff --git 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
index 012be8586d..e8ee87a936 100644
--- 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
@@ -24,7 +24,9 @@ import lombok.NonNull;
 
 import java.io.Serializable;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_ATTACHMENT_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_AUTHORIZATION_CODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_FROM_ADDRESS;
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_MESSAGE_CONTENT;
@@ -45,6 +47,8 @@ public class EmailSinkConfig implements Serializable {
     private String emailTransportProtocol;
     private Boolean emailSmtpAuth;
     private Integer emailSmtpPort;
+    private String emailAttachmentName;
+    private String emailFieldDelimiter;
 
     public EmailSinkConfig(@NonNull ReadonlyConfig pluginConfig) {
         super();
@@ -57,5 +61,7 @@ public class EmailSinkConfig implements Serializable {
         this.emailTransportProtocol = 
pluginConfig.get(EMAIL_TRANSPORT_PROTOCOL);
         this.emailSmtpAuth = pluginConfig.get(EMAIL_SMTP_AUTH);
         this.emailSmtpPort = pluginConfig.get(EMAIL_SMTP_PORT);
+        this.emailAttachmentName = pluginConfig.get(EMAIL_ATTACHMENT_NAME);
+        this.emailFieldDelimiter = pluginConfig.get(EMAIL_FIELD_DELIMITER);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkOptions.java
 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkOptions.java
index b3448b9dca..6d357ce5bf 100644
--- 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkOptions.java
@@ -74,4 +74,17 @@ public class EmailSinkOptions {
                     .intType()
                     .defaultValue(465)
                     .withDescription("Select port for authentication.");
+
+    public static final Option<String> EMAIL_ATTACHMENT_NAME =
+            Options.key("email_attachment_name")
+                    .stringType()
+                    .defaultValue("emailsink.csv")
+                    .withDescription("The name of the email attachment file");
+
+    public static final Option<String> EMAIL_FIELD_DELIMITER =
+            Options.key("email_field_delimiter")
+                    .stringType()
+                    .defaultValue(",")
+                    .withDescription(
+                            "The delimiter used to separate fields in the 
attachment file");
 }
diff --git 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
index f7fe04c068..4e25b3f976 100644
--- 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
@@ -57,26 +57,43 @@ public class EmailSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     private final SeaTunnelRowType seaTunnelRowType;
     private final EmailSinkConfig config;
     private StringBuffer stringBuffer;
+    private boolean hasData;
 
     public EmailSinkWriter(SeaTunnelRowType seaTunnelRowType, EmailSinkConfig 
pluginConfig) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.config = pluginConfig;
         this.stringBuffer = new StringBuffer();
+        this.hasData = false;
     }
 
     @Override
     public void write(SeaTunnelRow element) {
         Object[] fields = element.getFields();
 
-        for (Object field : fields) {
-            stringBuffer.append(field.toString() + ",");
+        for (int i = 0; i < fields.length; i++) {
+            Object field = fields[i];
+            // Handle null field values to avoid NPE
+            if (field == null) {
+                stringBuffer.append("");
+            } else {
+                stringBuffer.append(field.toString());
+            }
+            if (i < fields.length - 1) {
+                stringBuffer.append(config.getEmailFieldDelimiter());
+            }
         }
-        stringBuffer.deleteCharAt(fields.length - 1);
         stringBuffer.append("\n");
+        hasData = true;
     }
 
     @Override
     public void close() {
+        // Only send email if there was data written successfully
+        if (!hasData) {
+            log.info("No data to send, skipping email");
+            return;
+        }
+
         createFile();
         Properties properties = new Properties();
         properties.setProperty("mail.host", config.getEmailHost());
@@ -136,7 +153,7 @@ public class EmailSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
             multipart.addBodyPart(messageBodyPart);
             // accessory
             messageBodyPart = new MimeBodyPart();
-            String filename = "emailsink.csv";
+            String filename = config.getEmailAttachmentName();
             DataSource source = new FileDataSource(filename);
             messageBodyPart.setDataHandler(new DataHandler(source));
             messageBodyPart.setFileName(filename);
@@ -153,7 +170,7 @@ public class EmailSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     }
 
     public void createFile() {
-        String fileName = "emailsink.csv";
+        String fileName = config.getEmailAttachmentName();
         try {
             String data = stringBuffer.toString();
             File file = new File(fileName);
diff --git 
a/seatunnel-connectors-v2/connector-email/src/test/java/org/apache/seatunnel/connectors/seatunnel/email/EmailSinkWriterTest.java
 
b/seatunnel-connectors-v2/connector-email/src/test/java/org/apache/seatunnel/connectors/seatunnel/email/EmailSinkWriterTest.java
new file mode 100644
index 0000000000..f6ebc553df
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-email/src/test/java/org/apache/seatunnel/connectors/seatunnel/email/EmailSinkWriterTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.email;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.email.sink.EmailSinkWriter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EmailSinkWriterTest {
+
+    @Test
+    void testWriteWithNullValues() {
+        // Create a mock config
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("email_from_address", "[email protected]");
+        configMap.put("email_to_address", "[email protected]");
+        configMap.put("email_authorization_code", "code");
+        configMap.put("email_message_headline", "Test");
+        configMap.put("email_message_content", "Test content");
+        configMap.put("email_host", "smtp.example.com");
+        configMap.put("email_transport_protocol", "smtp");
+        configMap.put("email_smtp_auth", true);
+        configMap.put("email_smtp_port", 465);
+        configMap.put("email_attachment_name", "test.csv");
+        configMap.put("email_field_delimiter", ",");
+
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+        EmailSinkConfig sinkConfig = new EmailSinkConfig(config);
+
+        // Create row type with string fields
+        String[] fieldNames = {"field1", "field2", "field3"};
+        SeaTunnelDataType<?>[] fieldTypes = {
+            BasicType.STRING_TYPE, BasicType.STRING_TYPE, BasicType.STRING_TYPE
+        };
+        SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, 
fieldTypes);
+
+        // Create writer
+        EmailSinkWriter writer = new EmailSinkWriter(rowType, sinkConfig);
+
+        // Test writing row with null values - should not throw NPE
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {"value1", null, 
"value3"});
+
+        Assertions.assertDoesNotThrow(() -> writer.write(row));
+
+        // Test writing row with all null values - should not throw NPE
+        SeaTunnelRow nullRow = new SeaTunnelRow(new Object[] {null, null, 
null});
+
+        Assertions.assertDoesNotThrow(() -> writer.write(nullRow));
+    }
+
+    @Test
+    void testCustomDelimiter() {
+        // Create a mock config with custom delimiter
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("email_from_address", "[email protected]");
+        configMap.put("email_to_address", "[email protected]");
+        configMap.put("email_authorization_code", "code");
+        configMap.put("email_message_headline", "Test");
+        configMap.put("email_message_content", "Test content");
+        configMap.put("email_host", "smtp.example.com");
+        configMap.put("email_transport_protocol", "smtp");
+        configMap.put("email_smtp_auth", true);
+        configMap.put("email_smtp_port", 465);
+        configMap.put("email_attachment_name", "test.csv");
+        configMap.put("email_field_delimiter", "|");
+
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+        EmailSinkConfig sinkConfig = new EmailSinkConfig(config);
+
+        Assertions.assertEquals("|", sinkConfig.getEmailFieldDelimiter());
+        Assertions.assertEquals("test.csv", 
sinkConfig.getEmailAttachmentName());
+    }
+}

Reply via email to