This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3e200e0a3 [Feature][Connector-V2] Add Enterprise Wechat sink
connector (#2412)
3e200e0a3 is described below
commit 3e200e0a38b1358c025b7c5e08572eccfd10fbb1
Author: 531651225 <[email protected]>
AuthorDate: Wed Aug 17 10:35:46 2022 +0800
[Feature][Connector-V2] Add Enterprise Wechat sink connector (#2412)
* [Feature][Connector-V2] Add Enterprise WeChart source output format
---
docs/en/connector-v2/sink/Enterprise-WeChat.md | 53 ++++++++++++++++
seatunnel-connectors-v2-dist/pom.xml | 5 ++
.../{ => connector-http-wechat}/pom.xml | 16 ++---
.../wechat/sink/WeChatHttpSinkWriter.java | 70 ++++++++++++++++++++++
.../seatunnel/wechat/sink/WeChatSink.java | 60 +++++++++++++++++++
.../wechat/sink/config/WeChatSinkConfig.java | 45 ++++++++++++++
seatunnel-connectors-v2/connector-http/pom.xml | 1 +
7 files changed, 243 insertions(+), 7 deletions(-)
diff --git a/docs/en/connector-v2/sink/Enterprise-WeChat.md
b/docs/en/connector-v2/sink/Enterprise-WeChat.md
new file mode 100644
index 000000000..303648212
--- /dev/null
+++ b/docs/en/connector-v2/sink/Enterprise-WeChat.md
@@ -0,0 +1,53 @@
+# Enterprise WeChat
+
+> Enterprise WeChat sink connector
+
+## Description
+
+A sink plugin which use Enterprise WeChat robot send message
+> For example, if the data from upstream is [`"alarmStatus": "firing",
"alarmTime": "2022-08-03 01:38:49","alarmContent": "The disk usage exceeds the
threshold"`], the output content to WeChat Robot is the following:
+> ```
+> alarmStatus: firing
+> alarmTime: 2022-08-03 01:38:49
+> alarmContent: The disk usage exceeds the threshold
+> ```
+**Tips: WeChat sink only support `string` webhook and the data from source
will be treated as body content in web hook.**
+
+
+## Options
+
+| name | type | required | default value |
+| --- |--------|----------| --- |
+| url | String | Yes | - |
+| mentioned_list | array | No | - |
+| mentioned_mobile_list | array | No | - |
+
+### url [string]
+
+Enterprise WeChat webhook url format is
https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=XXXXXX(string)
+
+### mentioned_list [array]
+
+A list of userids to remind the specified members in the group (@ a member), @
all means to remind everyone. If the developer can't get the userid, he can use
called_ mobile_ list
+
+### mentioned_mobile_list [array]
+
+Mobile phone number list, remind the group member corresponding to the mobile
phone number (@ a member), @ all means remind everyone
+
+## Example
+
+simple:
+
+```hocon
+WeChat {
+ url =
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=693axxx6-7aoc-4bc4-97a0-0ec2sifa5aaa"
+ }
+```
+
+```hocon
+WeChat {
+ url =
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=693axxx6-7aoc-4bc4-97a0-0ec2sifa5aaa"
+ mentioned_list=["wangqing","@all"]
+ mentioned_mobile_list=["13800001111","@all"]
+ }
+```
diff --git a/seatunnel-connectors-v2-dist/pom.xml
b/seatunnel-connectors-v2-dist/pom.xml
index 00f4af6d5..0f50bba0e 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -61,6 +61,11 @@
<artifactId>connector-http-feishu</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-http-wechat</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-jdbc</artifactId>
diff --git a/seatunnel-connectors-v2/connector-http/pom.xml
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/pom.xml
similarity index 77%
copy from seatunnel-connectors-v2/connector-http/pom.xml
copy to seatunnel-connectors-v2/connector-http/connector-http-wechat/pom.xml
index e8fbcadc1..991eea99f 100644
--- a/seatunnel-connectors-v2/connector-http/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-wechat/pom.xml
@@ -21,17 +21,19 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel-connectors-v2</artifactId>
+ <artifactId>connector-http</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>connector-http</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>connector-http-base</module>
- <module>connector-http-feishu</module>
- </modules>
+ <artifactId>connector-http-wechat</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-http-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
new file mode 100644
index 000000000..44ecab013
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class WeChatHttpSinkWriter extends HttpSinkWriter {
+
+ private final WeChatSinkConfig weChatSinkConfig;
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ public WeChatHttpSinkWriter(HttpParameter httpParameter, Config
pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+ //new SeaTunnelRowType can match SeaTunnelRowWrapper fields sequence
+ super(new SeaTunnelRowType(new
String[]{WeChatSinkConfig.WECHAT_SEND_MSG_TYPE_KEY,
WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE}, new
SeaTunnelDataType[]{BasicType.VOID_TYPE, BasicType.VOID_TYPE}), httpParameter);
+ this.weChatSinkConfig = new WeChatSinkConfig(pluginConfig);
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ StringBuffer stringBuffer = new StringBuffer();
+ int totalFields = seaTunnelRowType.getTotalFields();
+ for (int i = 0; i < totalFields; i++) {
+ stringBuffer.append(seaTunnelRowType.getFieldName(i) + ": " +
element.getField(i) + "\\n");
+ }
+ if (totalFields > 0) {
+ //remove last empty line
+ stringBuffer.delete(stringBuffer.length() - 2,
stringBuffer.length());
+ }
+ HashMap<Object, Object> objectMap = new HashMap<>();
+ objectMap.put(WeChatSinkConfig.WECHAT_SEND_MSG_CONTENT_KEY,
stringBuffer.toString());
+ if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedList())) {
+ objectMap.put(WeChatSinkConfig.MENTIONED_LIST,
weChatSinkConfig.getMentionedList());
+ }
+ if
(!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedMobileList())) {
+ objectMap.put(WeChatSinkConfig.MENTIONED_MOBILE_LIST,
weChatSinkConfig.getMentionedMobileList());
+ }
+ //SeaTunnelRowWrapper can used to post wechat web hook
+ SeaTunnelRow wechatRowWrapper = new SeaTunnelRow(new
Object[]{WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE, objectMap});
+ super.write(wechatRowWrapper);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
new file mode 100644
index 000000000..64b6d97b4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class WeChatSink extends HttpSink {
+
+ @Override
+ public String getPluginName() {
+ return "WeChat";
+ }
+
+ private Config pluginConfig;
+
+ private SeaTunnelRowType seaTunnelRowType;
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ this.pluginConfig = pluginConfig;
+ super.prepare(pluginConfig);
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ super.setTypeInfo(seaTunnelRowType);
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
+ return new WeChatHttpSinkWriter(super.httpParameter, pluginConfig,
seaTunnelRowType);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
new file mode 100644
index 000000000..be2baab60
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.wechat.sink.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+import lombok.NonNull;
+
+import java.util.List;
+
+@Data
+public class WeChatSinkConfig {
+ public static final String WECHAT_SEND_MSG_SUPPORT_TYPE = "text";
+ public static final String WECHAT_SEND_MSG_TYPE_KEY = "msgtype";
+ public static final String WECHAT_SEND_MSG_CONTENT_KEY = "content";
+ public static final String MENTIONED_LIST = "mentioned_list";
+ public static final String MENTIONED_MOBILE_LIST = "mentioned_mobile_list";
+ private List<String> mentionedList;
+ private List<String> mentionedMobileList;
+
+ public WeChatSinkConfig(@NonNull Config pluginConfig){
+ if (pluginConfig.hasPath(MENTIONED_LIST)) {
+ this.mentionedList = pluginConfig.getStringList(MENTIONED_LIST);
+ }
+ if (pluginConfig.hasPath(MENTIONED_MOBILE_LIST)) {
+ this.mentionedMobileList =
pluginConfig.getStringList(MENTIONED_MOBILE_LIST);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-http/pom.xml
b/seatunnel-connectors-v2/connector-http/pom.xml
index e8fbcadc1..5b699bef0 100644
--- a/seatunnel-connectors-v2/connector-http/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/pom.xml
@@ -32,6 +32,7 @@
<modules>
<module>connector-http-base</module>
<module>connector-http-feishu</module>
+ <module>connector-http-wechat</module>
</modules>
</project>
\ No newline at end of file