This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7a836f2d4 [Feature][Connector-V2][Slack] Add Slack sink connector  
(#3226)
7a836f2d4 is described below

commit 7a836f2d44560ffb8edd7aa6c8a12b0565b94f2d
Author: Huan Liang <[email protected]>
AuthorDate: Mon Nov 21 22:44:23 2022 +0800

    [Feature][Connector-V2][Slack] Add Slack sink connector  (#3226)
    
    * [Doc]Fix typos in QuickStart Doc
    
    * [Feature][Connector-V2][Slack] Add Slack sink connector
    
    * [Feature][Connector-V2][Slack] Update pom and Add dependency in 
seatunnel-dist
    
    * [Feature][Connector-V2][Slack] Update change log
    
    * [Feature][Connector-V2][Slack] Update log info
    
    * [Feature][Connector-V2][Slack] Remove unrelative dependience update
    
    * [Feature][Connector-V2][Slack] Remove unrelative dependience update
    
    * [Feature][Connector-V2][Slack] Upate file plugin-mapping.properties
    
    * [Feature][Connector-V2][Slack] Upate file plugin-mapping.properties
    
    * [Feature][Connector-V2][Slack] Fix conversationID bug
    
    * [Feature][Connector-V2][Slack] Fix code style
    
    * [Feature][Connector-V2][Slack] Fix code style
    
    * [Feature][Connector-V2][Slack] Fix NPE bug
    
    * [Feature][Connector-V2][Slack] Fix Code Style
    
    * [Feature][Connector-V2][Slack] Update plugin-mapping.properties
    
    * [Feature][Connector-V2][Slack] Update plugin-mapping.properties
    
    * [Feature][Connector-V2][Slack] Update pom.xml
    
    * [Feature][Connector-V2][Slack] Update pom.xml
    
    * [Feature][Connector-V2][Slack] Update pom.xml
    
    * [Feature][Connector-V2][Slack] Fix Code Style
    
    * [Feature][Connector-V2][Slack] Fix Code Style
    
    * [Feature][Connector-V2][Slack] Add slack dependency
    
    * [Feature][Connector-V2][Slack] Remove duplicate parameters check
    
    * [Feature][Connector-V2][Slack] fix code style
    
    * [Feature][Connector-V2][Slack] Implement Factory for connectors
    
    * [Feature][Connector-V2][Slack] Fix code style
    
    * [Feature][Connector-V2][Slack] Fix code style
    
    * [Feature][Connector-V2][Slack] Add Slack Error-Quick-Reference-Manual
    
    Co-authored-by: Eric <[email protected]>
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |   7 ++
 docs/en/connector-v2/sink/Slack.md                 |  57 ++++++++++++
 plugin-mapping.properties                          |   1 +
 seatunnel-connectors-v2/connector-slack/pom.xml    |  69 ++++++++++++++
 .../seatunnel/slack/client/SlackClient.java        | 101 +++++++++++++++++++++
 .../seatunnel/slack/config/SlackConfig.java        |  47 ++++++++++
 .../slack/exception/SlackConnectorErrorCode.java   |  44 +++++++++
 .../slack/exception/SlackConnectorException.java   |  35 +++++++
 .../connectors/seatunnel/slack/sink/SlackSink.java |  85 +++++++++++++++++
 .../seatunnel/slack/sink/SlackSinkFactory.java     |  38 ++++++++
 .../seatunnel/slack/sink/SlackWriter.java          |  70 ++++++++++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   8 +-
 13 files changed, 562 insertions(+), 1 deletion(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index e30979da5..d509b952e 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -46,3 +46,10 @@ This document records some common error codes and 
corresponding solutions of Sea
 | CASSANDRA-03 | Close cql session of cassandra failed           | When users 
encounter this error code, it means that cassandra has some problems, please 
check it whether is work                                                      |
 | CASSANDRA-04 | No data in source table                         | When users 
encounter this error code, it means that source cassandra table has no data, 
please check it                                                               |
 | CASSANDRA-05 | Parse ip address from string field field        | When users 
encounter this error code, it means that upstream data does not match ip 
address format, please check it                                                 
  |
+
+## Slack Connector Error Codes
+
+| code      | description                                 | solution           
                                                                                
                |
+|-----------|---------------------------------------------|--------------------------------------------------------------------------------------------------------------------|
+| SLACK-01  | Conversation can not be founded in channels | When users 
encounter this error code, it means that the channel is not existed in slack 
workspace, please check it |
+| SLACK-02  | Write to slack channel failed               | When users 
encounter this error code, it means that slack has some problems, please check 
it whether is work       |
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Slack.md 
b/docs/en/connector-v2/sink/Slack.md
new file mode 100644
index 000000000..6f011bfc3
--- /dev/null
+++ b/docs/en/connector-v2/sink/Slack.md
@@ -0,0 +1,57 @@
+# Slack
+
+> Slack sink connector
+
+## Description
+
+Used to send data to Slack Channel. Both support streaming and batch mode.
+> For example, if the data from upstream is [`age: 12, name: huan`], the 
content send to socket server is the following: `{"name":"huan","age":17}`
+
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name           | type   | required | default value |
+| -------------- |--------|----------|---------------|
+| webhooks_url   | String | Yes      | -             |
+| oauth_token    | String | Yes      | -             |
+| slack_channel  | String | Yes      | -             |
+| common-options |        | no       | -             |
+
+### webhooks_url [string]
+
+Slack webhook url
+
+### oauth_token [string]
+
+Slack oauth token used for the actual authentication
+
+### slack_channel [string]
+
+slack channel for data write
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
+
+## Example
+
+```hocon
+sink {
+ SlackSink {
+  webhooks_url = 
"https://hooks.slack.com/services/xxxxxxxxxxxx/xxxxxxxxxxxx/xxxxxxxxxxxxxxxx";
+  oauth_token = "xoxp-xxxxxxxxxx-xxxxxxxx-xxxxxxxxx-xxxxxxxxxxx"
+  slack_channel = "channel name"
+ }
+}
+```
+
+## Changelog
+
+### new version
+
+- Add Slack Sink Connector
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 65271a258..0100e8ffd 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -150,4 +150,5 @@ seatunnel.source.GoogleSheets = connector-google-sheets
 seatunnel.sink.Tablestore = connector-tablestore
 seatunnel.source.Lemlist = connector-http-lemlist
 seatunnel.source.Klaviyo = connector-http-klaviyo
+seatunnel.sink.Slack = connector-slack
 seatunnel.source.OneSignal = connector-http-onesignal
diff --git a/seatunnel-connectors-v2/connector-slack/pom.xml 
b/seatunnel-connectors-v2/connector-slack/pom.xml
new file mode 100644
index 000000000..2bbb1fff0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-slack/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-slack</artifactId>
+
+    <properties>
+        <httpclient.version>4.5.13</httpclient.version>
+        <slack-api-client>1.25.0</slack-api-client>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpclient.version}</version>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/com.slack.api/slack-api-client 
-->
+        <dependency>
+            <groupId>com.slack.api</groupId>
+            <artifactId>slack-api-client</artifactId>
+            <version>${slack-api-client}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.jetbrains.kotlin</groupId>
+                    <artifactId>kotlin-stdlib-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jetbrains.kotlin</groupId>
+                    <artifactId>kotlin-stdlib</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
new file mode 100644
index 000000000..4a9470174
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.client;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig.OAUTH_TOKEN;
+import static 
org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig.SLACK_CHANNEL;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.slack.api.Slack;
+import com.slack.api.methods.MethodsClient;
+import com.slack.api.methods.SlackApiException;
+import com.slack.api.methods.response.chat.ChatPostMessageResponse;
+import com.slack.api.methods.response.conversations.ConversationsListResponse;
+import com.slack.api.model.Conversation;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+
+@Slf4j
+public class SlackClient {
+    private final Config pluginConfig;
+    private final MethodsClient methodsClient;
+
+    public SlackClient(Config pluginConfig) {
+        this.pluginConfig = pluginConfig;
+        this.methodsClient = Slack.getInstance().methods();
+    }
+
+    /**
+     * Find conversation ID using the conversations.list method
+     */
+    public String findConversation() {
+        String conversionId = "";
+        List<Conversation> channels;
+        try {
+            // Get Conversion List
+            ConversationsListResponse conversationsListResponse = 
methodsClient.conversationsList(r -> r
+                // The Token used to initialize app
+                .token(pluginConfig.getString(OAUTH_TOKEN.key()))
+            );
+            channels = conversationsListResponse.getChannels();
+            for (Conversation channel : channels) {
+                if 
(channel.getName().equals(pluginConfig.getString(SLACK_CHANNEL.key()))) {
+                    conversionId = channel.getId();
+                    // Break from for loop
+                    break;
+                }
+            }
+        } catch (IOException | SlackApiException e) {
+            log.warn("Find Slack Conversion Fail.", e);
+            throw new 
SlackConnectorException(SlackConnectorErrorCode.FIND_SLACK_CONVERSATION_FAILED, 
e);
+        }
+        return conversionId;
+    }
+
+    /**
+     * Post a message to a channel using Channel ID and message text
+     */
+    public boolean publishMessage(String channelId, String text) {
+        boolean publishMessageSuccess = false;
+        try {
+            ChatPostMessageResponse chatPostMessageResponse = 
methodsClient.chatPostMessage(r -> r
+                // The Token used to initialize app
+                .token(pluginConfig.getString(SLACK_CHANNEL.key()))
+                .channel(channelId)
+                .text(text)
+            );
+            publishMessageSuccess = chatPostMessageResponse.isOk();
+        } catch (IOException | SlackApiException e) {
+            log.error("error: {}", ExceptionUtils.getMessage(e));
+        }
+        return publishMessageSuccess;
+    }
+
+    /**
+     * Close Conversion
+     */
+    public void closeMethodClient() {
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/config/SlackConfig.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/config/SlackConfig.java
new file mode 100644
index 000000000..b7235255f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/config/SlackConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class SlackConfig implements Serializable {
+
+    public static final Option<String> WEBHOOKS_URL =
+            Options.key("webhooks_url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Slack webhoooks url");
+
+    public static final Option<String> OAUTH_TOKEN =
+            Options.key("oauth_token")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Slack oauth token");
+
+    public static final Option<String> SLACK_CHANNEL =
+            Options.key("slack_channel")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Slack slack channel");
+}
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorErrorCode.java
new file mode 100644
index 000000000..d124dcca1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorErrorCode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum SlackConnectorErrorCode implements SeaTunnelErrorCode {
+    FIND_SLACK_CONVERSATION_FAILED("SLACK-01", "Conversation can not be 
founded in channels"),
+    WRITE_TO_SLACK_CHANNEL_FAILED("SLACK-02", "Write to slack channel failed");
+
+    private final String code;
+
+    private final String description;
+
+    SlackConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getDescription() {
+        return this.description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorException.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorException.java
new file mode 100644
index 000000000..a6f443e56
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class SlackConnectorException extends SeaTunnelRuntimeException {
+    public SlackConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public SlackConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public SlackConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
new file mode 100644
index 000000000..a6d3022dd
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+
+
+/**
+ * Slack sink class
+ */
+@AutoService(SeaTunnelSink.class)
+public class SlackSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private Config pluginConfig;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) throws IOException {
+        return new SlackWriter(seaTunnelRowType, pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "SlackSink";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, 
SlackConfig.WEBHOOKS_URL.key(), SlackConfig.OAUTH_TOKEN.key(), 
SlackConfig.SLACK_CHANNEL.key());
+        if (!checkResult.isSuccess()) {
+            throw new 
SlackConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SINK, 
checkResult.getMsg()));
+        }
+        this.pluginConfig = pluginConfig;
+    }
+}
+
+
+
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
new file mode 100644
index 000000000..ccb0572e9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class SlackSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Slack";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(SlackConfig.WEBHOOKS_URL, 
SlackConfig.OAUTH_TOKEN, SlackConfig.SLACK_CHANNEL).build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
new file mode 100644
index 000000000..c8201db98
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.slack.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.slack.client.SlackClient;
+import 
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.StringJoiner;
+
+@Slf4j
+public class SlackWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+    private final String conversationId;
+    private final SlackClient slackClient;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private static final long POST_MSG_WAITING_TIME = 1500L;
+
+    public SlackWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig) 
{
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.slackClient = new SlackClient(pluginConfig);
+        this.conversationId = slackClient.findConversation();
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        Object[] fields = element.getFields();
+        StringJoiner stringJoiner = new StringJoiner(",", "", "\n");
+        for (Object field : fields) {
+            stringJoiner.add(String.valueOf(field));
+        }
+        String message = stringJoiner.toString();
+        try {
+            slackClient.publishMessage(conversationId, message);
+            // Slack has a limit on the frequency of sending messages
+            // One message can be sent as soon as one second
+            Thread.sleep(POST_MSG_WAITING_TIME);
+        } catch (Exception e) {
+            log.error("Write to Slack Fail.", ExceptionUtils.getMessage(e));
+            throw new 
SlackConnectorException(SlackConnectorErrorCode.WRITE_TO_SLACK_CHANNEL_FAILED, 
e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 9696421c3..4d8776e9c 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -62,6 +62,7 @@
         <module>connector-cassandra</module>
         <module>connector-starrocks</module>
         <module>connector-google-sheets</module>
+        <module>connector-slack</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index bbdac1d8b..c005860e4 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -351,6 +351,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-slack</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
             </dependencies>
         </profile>
         <profile>
@@ -442,4 +448,4 @@
             </build>
         </profile>
     </profiles>
-</project>
+</project>
\ No newline at end of file

Reply via email to