This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7a836f2d4 [Feature][Connector-V2][Slack] Add Slack sink connector
(#3226)
7a836f2d4 is described below
commit 7a836f2d44560ffb8edd7aa6c8a12b0565b94f2d
Author: Huan Liang <[email protected]>
AuthorDate: Mon Nov 21 22:44:23 2022 +0800
[Feature][Connector-V2][Slack] Add Slack sink connector (#3226)
* [Doc]Fix typos in QuickStart Doc
* [Feature][Connector-V2][Slack] Add Slack sink connector
* [Feature][Connector-V2][Slack] Update pom and Add dependency in
seatunnel-dist
* [Feature][Connector-V2][Slack] Update change log
* [Feature][Connector-V2][Slack] Update log info
* [Feature][Connector-V2][Slack] Remove unrelative dependience update
* [Feature][Connector-V2][Slack] Remove unrelative dependience update
* [Feature][Connector-V2][Slack] Upate file plugin-mapping.properties
* [Feature][Connector-V2][Slack] Upate file plugin-mapping.properties
* [Feature][Connector-V2][Slack] Fix conversationID bug
* [Feature][Connector-V2][Slack] Fix code style
* [Feature][Connector-V2][Slack] Fix code style
* [Feature][Connector-V2][Slack] Fix NPE bug
* [Feature][Connector-V2][Slack] Fix Code Style
* [Feature][Connector-V2][Slack] Update plugin-mapping.properties
* [Feature][Connector-V2][Slack] Update plugin-mapping.properties
* [Feature][Connector-V2][Slack] Update pom.xml
* [Feature][Connector-V2][Slack] Update pom.xml
* [Feature][Connector-V2][Slack] Update pom.xml
* [Feature][Connector-V2][Slack] Fix Code Style
* [Feature][Connector-V2][Slack] Fix Code Style
* [Feature][Connector-V2][Slack] Add slack dependency
* [Feature][Connector-V2][Slack] Remove duplicate parameters check
* [Feature][Connector-V2][Slack] fix code style
* [Feature][Connector-V2][Slack] Implement Factory for connectors
* [Feature][Connector-V2][Slack] Fix code style
* [Feature][Connector-V2][Slack] Fix code style
* [Feature][Connector-V2][Slack] Add Slack Error-Quick-Reference-Manual
Co-authored-by: Eric <[email protected]>
---
.../connector-v2/Error-Quick-Reference-Manual.md | 7 ++
docs/en/connector-v2/sink/Slack.md | 57 ++++++++++++
plugin-mapping.properties | 1 +
seatunnel-connectors-v2/connector-slack/pom.xml | 69 ++++++++++++++
.../seatunnel/slack/client/SlackClient.java | 101 +++++++++++++++++++++
.../seatunnel/slack/config/SlackConfig.java | 47 ++++++++++
.../slack/exception/SlackConnectorErrorCode.java | 44 +++++++++
.../slack/exception/SlackConnectorException.java | 35 +++++++
.../connectors/seatunnel/slack/sink/SlackSink.java | 85 +++++++++++++++++
.../seatunnel/slack/sink/SlackSinkFactory.java | 38 ++++++++
.../seatunnel/slack/sink/SlackWriter.java | 70 ++++++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 8 +-
13 files changed, 562 insertions(+), 1 deletion(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index e30979da5..d509b952e 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -46,3 +46,10 @@ This document records some common error codes and
corresponding solutions of Sea
| CASSANDRA-03 | Close cql session of cassandra failed | When users
encounter this error code, it means that cassandra has some problems, please
check it whether is work |
| CASSANDRA-04 | No data in source table | When users
encounter this error code, it means that source cassandra table has no data,
please check it |
| CASSANDRA-05 | Parse ip address from string field field | When users
encounter this error code, it means that upstream data does not match ip
address format, please check it
|
+
+## Slack Connector Error Codes
+
+| code | description | solution
|
+|-----------|---------------------------------------------|--------------------------------------------------------------------------------------------------------------------|
+| SLACK-01 | Conversation can not be founded in channels | When users
encounter this error code, it means that the channel is not existed in slack
workspace, please check it |
+| SLACK-02 | Write to slack channel failed | When users
encounter this error code, it means that slack has some problems, please check
it whether is work |
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Slack.md
b/docs/en/connector-v2/sink/Slack.md
new file mode 100644
index 000000000..6f011bfc3
--- /dev/null
+++ b/docs/en/connector-v2/sink/Slack.md
@@ -0,0 +1,57 @@
+# Slack
+
+> Slack sink connector
+
+## Description
+
+Used to send data to Slack Channel. Both support streaming and batch mode.
+> For example, if the data from upstream is [`age: 12, name: huan`], the
content send to socket server is the following: `{"name":"huan","age":17}`
+
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+| -------------- |--------|----------|---------------|
+| webhooks_url | String | Yes | - |
+| oauth_token | String | Yes | - |
+| slack_channel | String | Yes | - |
+| common-options | | no | - |
+
+### webhooks_url [string]
+
+Slack webhook url
+
+### oauth_token [string]
+
+Slack oauth token used for the actual authentication
+
+### slack_channel [string]
+
+slack channel for data write
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
+
+## Example
+
+```hocon
+sink {
+ SlackSink {
+ webhooks_url =
"https://hooks.slack.com/services/xxxxxxxxxxxx/xxxxxxxxxxxx/xxxxxxxxxxxxxxxx"
+ oauth_token = "xoxp-xxxxxxxxxx-xxxxxxxx-xxxxxxxxx-xxxxxxxxxxx"
+ slack_channel = "channel name"
+ }
+}
+```
+
+## Changelog
+
+### new version
+
+- Add Slack Sink Connector
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 65271a258..0100e8ffd 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -150,4 +150,5 @@ seatunnel.source.GoogleSheets = connector-google-sheets
seatunnel.sink.Tablestore = connector-tablestore
seatunnel.source.Lemlist = connector-http-lemlist
seatunnel.source.Klaviyo = connector-http-klaviyo
+seatunnel.sink.Slack = connector-slack
seatunnel.source.OneSignal = connector-http-onesignal
diff --git a/seatunnel-connectors-v2/connector-slack/pom.xml
b/seatunnel-connectors-v2/connector-slack/pom.xml
new file mode 100644
index 000000000..2bbb1fff0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-slack/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-slack</artifactId>
+
+ <properties>
+ <httpclient.version>4.5.13</httpclient.version>
+ <slack-api-client>1.25.0</slack-api-client>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/com.slack.api/slack-api-client
-->
+ <dependency>
+ <groupId>com.slack.api</groupId>
+ <artifactId>slack-api-client</artifactId>
+ <version>${slack-api-client}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
new file mode 100644
index 000000000..4a9470174
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.client;
+
+import static
org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig.OAUTH_TOKEN;
+import static
org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig.SLACK_CHANNEL;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.slack.api.Slack;
+import com.slack.api.methods.MethodsClient;
+import com.slack.api.methods.SlackApiException;
+import com.slack.api.methods.response.chat.ChatPostMessageResponse;
+import com.slack.api.methods.response.conversations.ConversationsListResponse;
+import com.slack.api.model.Conversation;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+
+@Slf4j
+public class SlackClient {
+ private final Config pluginConfig;
+ private final MethodsClient methodsClient;
+
+ public SlackClient(Config pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ this.methodsClient = Slack.getInstance().methods();
+ }
+
+ /**
+ * Find conversation ID using the conversations.list method
+ */
+ public String findConversation() {
+ String conversionId = "";
+ List<Conversation> channels;
+ try {
+ // Get Conversion List
+ ConversationsListResponse conversationsListResponse =
methodsClient.conversationsList(r -> r
+ // The Token used to initialize app
+ .token(pluginConfig.getString(OAUTH_TOKEN.key()))
+ );
+ channels = conversationsListResponse.getChannels();
+ for (Conversation channel : channels) {
+ if
(channel.getName().equals(pluginConfig.getString(SLACK_CHANNEL.key()))) {
+ conversionId = channel.getId();
+ // Break from for loop
+ break;
+ }
+ }
+ } catch (IOException | SlackApiException e) {
+ log.warn("Find Slack Conversion Fail.", e);
+ throw new
SlackConnectorException(SlackConnectorErrorCode.FIND_SLACK_CONVERSATION_FAILED,
e);
+ }
+ return conversionId;
+ }
+
+ /**
+ * Post a message to a channel using Channel ID and message text
+ */
+ public boolean publishMessage(String channelId, String text) {
+ boolean publishMessageSuccess = false;
+ try {
+ ChatPostMessageResponse chatPostMessageResponse =
methodsClient.chatPostMessage(r -> r
+ // The Token used to initialize app
+ .token(pluginConfig.getString(SLACK_CHANNEL.key()))
+ .channel(channelId)
+ .text(text)
+ );
+ publishMessageSuccess = chatPostMessageResponse.isOk();
+ } catch (IOException | SlackApiException e) {
+ log.error("error: {}", ExceptionUtils.getMessage(e));
+ }
+ return publishMessageSuccess;
+ }
+
+ /**
+ * Close Conversion
+ */
+ public void closeMethodClient() {
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/config/SlackConfig.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/config/SlackConfig.java
new file mode 100644
index 000000000..b7235255f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/config/SlackConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class SlackConfig implements Serializable {
+
+ public static final Option<String> WEBHOOKS_URL =
+ Options.key("webhooks_url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Slack webhoooks url");
+
+ public static final Option<String> OAUTH_TOKEN =
+ Options.key("oauth_token")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Slack oauth token");
+
+ public static final Option<String> SLACK_CHANNEL =
+ Options.key("slack_channel")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Slack slack channel");
+}
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorErrorCode.java
new file mode 100644
index 000000000..d124dcca1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorErrorCode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum SlackConnectorErrorCode implements SeaTunnelErrorCode {
+ FIND_SLACK_CONVERSATION_FAILED("SLACK-01", "Conversation can not be
founded in channels"),
+ WRITE_TO_SLACK_CHANNEL_FAILED("SLACK-02", "Write to slack channel failed");
+
+ private final String code;
+
+ private final String description;
+
+ SlackConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorException.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorException.java
new file mode 100644
index 000000000..a6f443e56
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class SlackConnectorException extends SeaTunnelRuntimeException {
+ public SlackConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public SlackConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public SlackConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
new file mode 100644
index 000000000..a6d3022dd
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig;
+import
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+
+
+/**
+ * Slack sink class
+ */
+@AutoService(SeaTunnelSink.class)
+public class SlackSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+ private Config pluginConfig;
+ private SeaTunnelRowType seaTunnelRowType;
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) throws IOException {
+ return new SlackWriter(seaTunnelRowType, pluginConfig);
+ }
+
+ @Override
+ public String getPluginName() {
+ return "SlackSink";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig,
SlackConfig.WEBHOOKS_URL.key(), SlackConfig.OAUTH_TOKEN.key(),
SlackConfig.SLACK_CHANNEL.key());
+ if (!checkResult.isSuccess()) {
+ throw new
SlackConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SINK,
checkResult.getMsg()));
+ }
+ this.pluginConfig = pluginConfig;
+ }
+}
+
+
+
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
new file mode 100644
index 000000000..ccb0572e9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class SlackSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Slack";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(SlackConfig.WEBHOOKS_URL,
SlackConfig.OAUTH_TOKEN, SlackConfig.SLACK_CHANNEL).build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
new file mode 100644
index 000000000..c8201db98
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.slack.client.SlackClient;
+import
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.StringJoiner;
+
+@Slf4j
+public class SlackWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+ private final String conversationId;
+ private final SlackClient slackClient;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private static final long POST_MSG_WAITING_TIME = 1500L;
+
+ public SlackWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig)
{
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.slackClient = new SlackClient(pluginConfig);
+ this.conversationId = slackClient.findConversation();
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ Object[] fields = element.getFields();
+ StringJoiner stringJoiner = new StringJoiner(",", "", "\n");
+ for (Object field : fields) {
+ stringJoiner.add(String.valueOf(field));
+ }
+ String message = stringJoiner.toString();
+ try {
+ slackClient.publishMessage(conversationId, message);
+ // Slack has a limit on the frequency of sending messages
+ // One message can be sent as soon as one second
+ Thread.sleep(POST_MSG_WAITING_TIME);
+ } catch (Exception e) {
+ log.error("Write to Slack Fail.", ExceptionUtils.getMessage(e));
+ throw new
SlackConnectorException(SlackConnectorErrorCode.WRITE_TO_SLACK_CHANNEL_FAILED,
e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 9696421c3..4d8776e9c 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -62,6 +62,7 @@
<module>connector-cassandra</module>
<module>connector-starrocks</module>
<module>connector-google-sheets</module>
+ <module>connector-slack</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index bbdac1d8b..c005860e4 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -351,6 +351,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-slack</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</profile>
<profile>
@@ -442,4 +448,4 @@
</build>
</profile>
</profiles>
-</project>
+</project>
\ No newline at end of file