This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 581292f21 [Improve][Connector-V2][Socket] Unified exception for socket 
source & sink connector (#3511)
581292f21 is described below

commit 581292f210696b4e1122f9053bb6b9ace5c219d0
Author: ChunFu Wu <[email protected]>
AuthorDate: Thu Nov 24 10:20:01 2022 +0800

    [Improve][Connector-V2][Socket] Unified exception for socket source & sink 
connector (#3511)
    
    * [Improve][Connector-V2][Socket] Unified exception for socket source & 
sink connector
---
 .../connector-v2/Error-Quick-Reference-Manual.md   | 10 +++-
 .../socket/{sink => config}/SinkConfig.java        |  2 +-
 .../socket/exception/SocketConnectorErrorCode.java | 51 +++++++++++++++++++
 .../socket/exception/SocketConnectorException.java | 36 +++++++++++++
 .../seatunnel/socket/sink/SocketClient.java        | 59 +++++++---------------
 .../seatunnel/socket/sink/SocketSink.java          | 12 +++--
 .../seatunnel/socket/sink/SocketSinkWriter.java    |  1 +
 .../seatunnel/socket/source/SocketSource.java      | 12 +++--
 .../socket/source/SocketSourceParameter.java       |  4 +-
 .../socket/source/SocketSourceReader.java          |  8 +--
 .../seatunnel/socket/state/SocketState.java        | 23 ---------
 11 files changed, 140 insertions(+), 78 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index d509b952e..17f45742a 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -52,4 +52,12 @@ This document records some common error codes and 
corresponding solutions of Sea
 | code      | description                                 | solution           
                                                                                
                |
 
|-----------|---------------------------------------------|--------------------------------------------------------------------------------------------------------------------|
 | SLACK-01  | Conversation can not be founded in channels | When users 
encounter this error code, it means that the channel is not existed in slack 
workspace, please check it |
-| SLACK-02  | Write to slack channel failed               | When users 
encounter this error code, it means that slack has some problems, please check 
it whether is work       |
\ No newline at end of file
+| SLACK-02  | Write to slack channel failed               | When users 
encounter this error code, it means that slack has some problems, please check 
it whether is work       |
+
+## Socket Connector Error Codes
+
+| code      | description                                              | 
solution                                                                        
                                               |
+|-----------|----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------|
+| SOCKET-01 | Cannot connect to socket server                          | When 
the user encounters this error code, it means that the connection address may 
not match, please check                     |
+| SOCKET-02 | Failed to send message to socket server                  | When 
the user encounters this error code, it means that there is a problem sending 
data and retry is not enabled, please check |
+| SOCKET-03 | Unable to write; interrupted while doing another attempt | When 
the user encounters this error code, it means that the data writing is 
interrupted abnormally, please check               |
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
similarity index 96%
rename from 
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SinkConfig.java
rename to 
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
index 4ebd15210..e06d704b9 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.socket.sink;
+package org.apache.seatunnel.connectors.seatunnel.socket.config;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.MAX_RETRIES;
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/exception/SocketConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/exception/SocketConnectorErrorCode.java
new file mode 100644
index 000000000..74d6d4c28
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/exception/SocketConnectorErrorCode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum SocketConnectorErrorCode implements SeaTunnelErrorCode {
+
+    SOCKET_SERVER_CONNECT_FAILED("SOCKET-01", "Cannot connect to socket 
server"),
+    SEND_MESSAGE_TO_SOCKET_SERVER_FAILED("SOCKET-02", "Failed to send message 
to socket server"),
+    SOCKET_WRITE_FAILED("SOCKET-03", "Unable to write; interrupted while doing 
another attempt");
+
+    private final String code;
+
+    private final String description;
+
+    SocketConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getDescription() {
+        return this.description;
+    }
+
+    @Override
+    public String getErrorMessage() {
+        return SeaTunnelErrorCode.super.getErrorMessage();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/exception/SocketConnectorException.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/exception/SocketConnectorException.java
new file mode 100644
index 000000000..ff9a3d308
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/exception/SocketConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class SocketConnectorException extends SeaTunnelRuntimeException {
+
+    public SocketConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public SocketConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public SocketConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
index 3e83044b3..fb485134d 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
@@ -19,6 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.socket.sink;
 
 import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorException;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -61,7 +64,9 @@ public class SocketClient {
                 createConnection();
             }
         } catch (IOException e) {
-            throw new IOException("Cannot connect to socket server at " + 
hostName + ":" + port, e);
+            throw new 
SocketConnectorException(SocketConnectorErrorCode.SOCKET_SERVER_CONNECT_FAILED,
+                String.format("Cannot connect to socket server at %s:%d",
+                hostName, port), e);
         }
     }
 
@@ -70,37 +75,22 @@ public class SocketClient {
         try {
             outputStream.write(msg);
             outputStream.flush();
-
         } catch (IOException e) {
             // if no re-tries are enable, fail immediately
             if (maxNumRetries == 0) {
-                throw new IOException(
-                        "Failed to send message '"
-                                + row
-                                + "' to socket server at "
-                                + hostName
-                                + ":"
-                                + port
-                                + ". Connection re-tries are not enabled.",
-                        e);
+                throw new 
SocketConnectorException(SocketConnectorErrorCode.SEND_MESSAGE_TO_SOCKET_SERVER_FAILED,
+                    String.format("Failed to send message '%s' to socket 
server at %s:%d. Connection re-tries are not enabled.",
+                    row, hostName, port), e);
             }
 
             log.error(
-                    "Failed to send message '"
-                            + row
-                            + "' to socket server at "
-                            + hostName
-                            + ":"
-                            + port
-                            + ". Trying to reconnect...",
-                    e);
+                "Failed to send message '{}' to socket server at {}:{}. Trying 
to reconnect...",
+                row, hostName, port, e);
 
             synchronized (SocketClient.class) {
                 IOException lastException = null;
                 retries = 0;
-
                 while (isRunning && (maxNumRetries < 0 || retries < 
maxNumRetries)) {
-
                     // first, clean up the old resources
                     try {
                         if (outputStream != null) {
@@ -127,33 +117,22 @@ public class SocketClient {
                         return;
                     } catch (IOException ee) {
                         lastException = ee;
-                        log.error(
-                                "Re-connect to socket server and send message 
failed. Retry time(s): "
-                                        + retries,
-                                ee);
+                        log.error("Re-connect to socket server and send 
message failed. Retry time(s): {}",
+                            retries, ee);
                     }
                     try {
                         this.wait(CONNECTION_RETRY_DELAY);
-                    }
-                    catch (InterruptedException ex) {
+                    } catch (InterruptedException ex) {
                         Thread.currentThread().interrupt();
-                        throw new IOException(
-                                "unable to write; interrupted while doing 
another attempt", e);
+                        throw new 
SocketConnectorException(SocketConnectorErrorCode.SOCKET_WRITE_FAILED,
+                            "unable to write; interrupted while doing another 
attempt", e);
                     }
                 }
 
                 if (isRunning) {
-                    throw new IOException(
-                            "Failed to send message '"
-                                    + row
-                                    + "' to socket server at "
-                                    + hostName
-                                    + ":"
-                                    + port
-                                    + ". Failed after "
-                                    + retries
-                                    + " retries.",
-                            lastException);
+                    throw new 
SocketConnectorException(SocketConnectorErrorCode.SEND_MESSAGE_TO_SOCKET_SERVER_FAILED,
+                        String.format("Failed to send message '%s' to socket 
server at %s:%d. Failed after %d retries.",
+                        row, hostName, port, retries), lastException);
                 }
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
index 1bc88fa5a..51ad05420 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
@@ -21,6 +21,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSink
 import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -31,6 +32,8 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -54,7 +57,10 @@ public class SocketSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.pluginConfig = pluginConfig;
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
PORT.key(), HOST.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
+            throw new 
SocketConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    getPluginName(), PluginType.SINK, result.getMsg())
+            );
         }
         sinkConfig = new SinkConfig(pluginConfig);
     }
@@ -65,12 +71,12 @@ public class SocketSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     }
 
     @Override
-    public SeaTunnelDataType getConsumedType() {
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
         return seaTunnelRowType;
     }
 
     @Override
-    public AbstractSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) throws IOException {
         return new SocketSinkWriter(sinkConfig, seaTunnelRowType);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
index 577835927..c901abfc1 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
 import java.io.IOException;
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
index 82fab08bf..3939eb064 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
@@ -22,6 +22,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSink
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -35,6 +36,7 @@ import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import 
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -59,7 +61,10 @@ public class SocketSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
PORT.key(), HOST.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
SocketConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    getPluginName(), PluginType.SOURCE, result.getMsg())
+            );
         }
         this.parameter = new SocketSourceParameter(pluginConfig);
     }
@@ -71,12 +76,11 @@ public class SocketSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return new SeaTunnelRowType(new String[]{"value"}, new 
SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+        return new SeaTunnelRowType(new String[] {"value"}, new 
SeaTunnelDataType<?>[] {BasicType.STRING_TYPE});
     }
 
     @Override
-    public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext)
-        throws Exception {
+    public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
         return new SocketSourceReader(this.parameter, readerContext);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
index 09d001787..cb9279861 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
@@ -28,8 +28,8 @@ import java.io.Serializable;
 import java.util.Objects;
 
 public class SocketSourceParameter implements Serializable {
-    private String host;
-    private Integer port;
+    private final String host;
+    private final Integer port;
 
     public String getHost() {
         return StringUtils.isBlank(host) ? HOST.defaultValue() : host;
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
index 8a86fa47b..34bd2b8f1 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
@@ -37,7 +37,7 @@ public class SocketSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow>
     private final SocketSourceParameter parameter;
     private final SingleSplitReaderContext context;
     private Socket socket;
-    private String delimiter = "\n";
+    private final String delimiter = "\n";
 
     SocketSourceReader(SocketSourceParameter parameter, 
SingleSplitReaderContext context) {
         this.parameter = parameter;
@@ -70,10 +70,10 @@ public class SocketSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow>
                 int delimPos;
                 while (buffer.length() >= this.delimiter.length() && (delimPos 
= buffer.indexOf(this.delimiter)) != -1) {
                     String record = buffer.substring(0, delimPos);
-                    if (this.delimiter.equals("\n") && record.endsWith("\r")) {
+                    if (record.endsWith("\r")) {
                         record = record.substring(0, record.length() - 1);
                     }
-                    output.collect(new SeaTunnelRow(new Object[]{record}));
+                    output.collect(new SeaTunnelRow(new Object[] {record}));
                     buffer.delete(0, delimPos + this.delimiter.length());
                 }
                 if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
@@ -84,7 +84,7 @@ public class SocketSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow>
             }
         }
         if (buffer.length() > 0) {
-            output.collect(new SeaTunnelRow(new Object[]{buffer.toString()}));
+            output.collect(new SeaTunnelRow(new Object[] {buffer.toString()}));
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/state/SocketState.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/state/SocketState.java
deleted file mode 100644
index f5d582192..000000000
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/state/SocketState.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.socket.state;
-
-import java.io.Serializable;
-
-public class SocketState implements Serializable {
-}

Reply via email to