This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5f4e944378 [Bug][Connector-V2][Email] Fix NPE on null values, add
configurable attachment name and delimiter (#10112)
5f4e944378 is described below
commit 5f4e944378815f751e026f7af319e456d850b566
Author: zhan7236 <[email protected]>
AuthorDate: Wed Nov 26 22:43:08 2025 +0800
[Bug][Connector-V2][Email] Fix NPE on null values, add configurable
attachment name and delimiter (#10112)
---
docs/en/connector-v2/sink/Email.md | 12 +++
docs/zh/connector-v2/sink/Email.md | 22 +++--
.../seatunnel/email/config/EmailSinkConfig.java | 6 ++
.../seatunnel/email/config/EmailSinkOptions.java | 13 +++
.../seatunnel/email/sink/EmailSinkWriter.java | 27 ++++--
.../seatunnel/email/EmailSinkWriterTest.java | 98 ++++++++++++++++++++++
6 files changed, 168 insertions(+), 10 deletions(-)
diff --git a/docs/en/connector-v2/sink/Email.md
b/docs/en/connector-v2/sink/Email.md
index ea8f04ce00..6d603ee6c9 100644
--- a/docs/en/connector-v2/sink/Email.md
+++ b/docs/en/connector-v2/sink/Email.md
@@ -27,6 +27,8 @@ The tested email version is 1.5.6.
| email_authorization_code | string | no | - |
| email_message_headline | string | yes | - |
| email_message_content | string | yes | - |
+| email_attachment_name | string | no | emailsink.csv |
+| email_field_delimiter | string | no | , |
| common-options | | no | - |
### email_from_address [string]
@@ -65,6 +67,14 @@ The subject line of the entire message.
The body of the entire message.
+### email_attachment_name [string]
+
+The name of the email attachment file. Default is `emailsink.csv`.
+
+### email_field_delimiter [string]
+
+The delimiter used to separate fields in the attachment file. Default is comma
`,`.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details.
@@ -82,6 +92,8 @@ Sink plugin common parameters, please refer to [Sink Common
Options](../sink-com
email_authorization_code=""
email_message_headline=""
email_message_content=""
+ email_attachment_name="report.csv" # Optional, default is emailsink.csv
+ email_field_delimiter="|" # Optional, default is ,
}
```
diff --git a/docs/zh/connector-v2/sink/Email.md
b/docs/zh/connector-v2/sink/Email.md
index 472cf6c8e3..261cb67df9 100644
--- a/docs/zh/connector-v2/sink/Email.md
+++ b/docs/zh/connector-v2/sink/Email.md
@@ -25,11 +25,13 @@ import ChangeLog from '../changelog/connector-email.md';
| email_host | string | 是 | - |
| email_transport_protocol | string | 是 | - |
| email_smtp_auth | boolean | 是 | - |
-| email_smtp_port | int | 否 | 465 |
-| email_authorization_code | string | 否 | - |
-| email_message_headline | string | 是 | - |
-| email_message_content | string | 是 | - |
-| common-options | | 否 | - |
+| email_smtp_port | int | 否 | 465 |
+| email_authorization_code | string | 否 | - |
+| email_message_headline | string | 是 | - |
+| email_message_content | string | 是 | - |
+| email_attachment_name | string | 否 | emailsink.csv |
+| email_field_delimiter | string | 否 | , |
+| common-options | | 否 | - |
### email_from_address [string]
@@ -67,6 +69,14 @@ import ChangeLog from '../changelog/connector-email.md';
邮件消息的正文
+### email_attachment_name [string]
+
+邮件附件的文件名。默认为 `emailsink.csv`。
+
+### email_field_delimiter [string]
+
+附件文件中用于分隔字段的分隔符。默认为逗号 `,`。
+
### common options
Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md) 了解详情.
@@ -84,6 +94,8 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
email_authorization_code=""
email_message_headline=""
email_message_content=""
+ email_attachment_name="report.csv" # 可选,默认为 emailsink.csv
+ email_field_delimiter="|" # 可选,默认为 ,
}
```
diff --git
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
index 012be8586d..e8ee87a936 100644
---
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
@@ -24,7 +24,9 @@ import lombok.NonNull;
import java.io.Serializable;
+import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_ATTACHMENT_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_AUTHORIZATION_CODE;
+import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_FROM_ADDRESS;
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_HOST;
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_MESSAGE_CONTENT;
@@ -45,6 +47,8 @@ public class EmailSinkConfig implements Serializable {
private String emailTransportProtocol;
private Boolean emailSmtpAuth;
private Integer emailSmtpPort;
+ private String emailAttachmentName;
+ private String emailFieldDelimiter;
public EmailSinkConfig(@NonNull ReadonlyConfig pluginConfig) {
super();
@@ -57,5 +61,7 @@ public class EmailSinkConfig implements Serializable {
this.emailTransportProtocol =
pluginConfig.get(EMAIL_TRANSPORT_PROTOCOL);
this.emailSmtpAuth = pluginConfig.get(EMAIL_SMTP_AUTH);
this.emailSmtpPort = pluginConfig.get(EMAIL_SMTP_PORT);
+ this.emailAttachmentName = pluginConfig.get(EMAIL_ATTACHMENT_NAME);
+ this.emailFieldDelimiter = pluginConfig.get(EMAIL_FIELD_DELIMITER);
}
}
diff --git
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkOptions.java
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkOptions.java
index b3448b9dca..6d357ce5bf 100644
---
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkOptions.java
@@ -74,4 +74,17 @@ public class EmailSinkOptions {
.intType()
.defaultValue(465)
.withDescription("Select port for authentication.");
+
+ public static final Option<String> EMAIL_ATTACHMENT_NAME =
+ Options.key("email_attachment_name")
+ .stringType()
+ .defaultValue("emailsink.csv")
+ .withDescription("The name of the email attachment file");
+
+ public static final Option<String> EMAIL_FIELD_DELIMITER =
+ Options.key("email_field_delimiter")
+ .stringType()
+ .defaultValue(",")
+ .withDescription(
+ "The delimiter used to separate fields in the
attachment file");
}
diff --git
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
index f7fe04c068..4e25b3f976 100644
---
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
@@ -57,26 +57,43 @@ public class EmailSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
private final SeaTunnelRowType seaTunnelRowType;
private final EmailSinkConfig config;
private StringBuffer stringBuffer;
+ private boolean hasData;
public EmailSinkWriter(SeaTunnelRowType seaTunnelRowType, EmailSinkConfig
pluginConfig) {
this.seaTunnelRowType = seaTunnelRowType;
this.config = pluginConfig;
this.stringBuffer = new StringBuffer();
+ this.hasData = false;
}
@Override
public void write(SeaTunnelRow element) {
Object[] fields = element.getFields();
- for (Object field : fields) {
- stringBuffer.append(field.toString() + ",");
+ for (int i = 0; i < fields.length; i++) {
+ Object field = fields[i];
+ // Handle null field values to avoid NPE
+ if (field == null) {
+ stringBuffer.append("");
+ } else {
+ stringBuffer.append(field.toString());
+ }
+ if (i < fields.length - 1) {
+ stringBuffer.append(config.getEmailFieldDelimiter());
+ }
}
- stringBuffer.deleteCharAt(fields.length - 1);
stringBuffer.append("\n");
+ hasData = true;
}
@Override
public void close() {
+ // Only send email if there was data written successfully
+ if (!hasData) {
+ log.info("No data to send, skipping email");
+ return;
+ }
+
createFile();
Properties properties = new Properties();
properties.setProperty("mail.host", config.getEmailHost());
@@ -136,7 +153,7 @@ public class EmailSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
multipart.addBodyPart(messageBodyPart);
// accessory
messageBodyPart = new MimeBodyPart();
- String filename = "emailsink.csv";
+ String filename = config.getEmailAttachmentName();
DataSource source = new FileDataSource(filename);
messageBodyPart.setDataHandler(new DataHandler(source));
messageBodyPart.setFileName(filename);
@@ -153,7 +170,7 @@ public class EmailSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
}
public void createFile() {
- String fileName = "emailsink.csv";
+ String fileName = config.getEmailAttachmentName();
try {
String data = stringBuffer.toString();
File file = new File(fileName);
diff --git
a/seatunnel-connectors-v2/connector-email/src/test/java/org/apache/seatunnel/connectors/seatunnel/email/EmailSinkWriterTest.java
b/seatunnel-connectors-v2/connector-email/src/test/java/org/apache/seatunnel/connectors/seatunnel/email/EmailSinkWriterTest.java
new file mode 100644
index 0000000000..f6ebc553df
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-email/src/test/java/org/apache/seatunnel/connectors/seatunnel/email/EmailSinkWriterTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.email;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.email.sink.EmailSinkWriter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EmailSinkWriterTest {
+
+ @Test
+ void testWriteWithNullValues() {
+ // Create a mock config
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("email_from_address", "[email protected]");
+ configMap.put("email_to_address", "[email protected]");
+ configMap.put("email_authorization_code", "code");
+ configMap.put("email_message_headline", "Test");
+ configMap.put("email_message_content", "Test content");
+ configMap.put("email_host", "smtp.example.com");
+ configMap.put("email_transport_protocol", "smtp");
+ configMap.put("email_smtp_auth", true);
+ configMap.put("email_smtp_port", 465);
+ configMap.put("email_attachment_name", "test.csv");
+ configMap.put("email_field_delimiter", ",");
+
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ EmailSinkConfig sinkConfig = new EmailSinkConfig(config);
+
+ // Create row type with string fields
+ String[] fieldNames = {"field1", "field2", "field3"};
+ SeaTunnelDataType<?>[] fieldTypes = {
+ BasicType.STRING_TYPE, BasicType.STRING_TYPE, BasicType.STRING_TYPE
+ };
+ SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames,
fieldTypes);
+
+ // Create writer
+ EmailSinkWriter writer = new EmailSinkWriter(rowType, sinkConfig);
+
+ // Test writing row with null values - should not throw NPE
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {"value1", null,
"value3"});
+
+ Assertions.assertDoesNotThrow(() -> writer.write(row));
+
+ // Test writing row with all null values - should not throw NPE
+ SeaTunnelRow nullRow = new SeaTunnelRow(new Object[] {null, null,
null});
+
+ Assertions.assertDoesNotThrow(() -> writer.write(nullRow));
+ }
+
+ @Test
+ void testCustomDelimiter() {
+ // Create a mock config with custom delimiter
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("email_from_address", "[email protected]");
+ configMap.put("email_to_address", "[email protected]");
+ configMap.put("email_authorization_code", "code");
+ configMap.put("email_message_headline", "Test");
+ configMap.put("email_message_content", "Test content");
+ configMap.put("email_host", "smtp.example.com");
+ configMap.put("email_transport_protocol", "smtp");
+ configMap.put("email_smtp_auth", true);
+ configMap.put("email_smtp_port", 465);
+ configMap.put("email_attachment_name", "test.csv");
+ configMap.put("email_field_delimiter", "|");
+
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ EmailSinkConfig sinkConfig = new EmailSinkConfig(config);
+
+ Assertions.assertEquals("|", sinkConfig.getEmailFieldDelimiter());
+ Assertions.assertEquals("test.csv",
sinkConfig.getEmailAttachmentName());
+ }
+}