This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 8d5712c1d [Improve] Improve StarRocks Sink Config (#4212)
8d5712c1d is described below

commit 8d5712c1dbf33cddc814e8ae5b3283daff31c9be
Author: Hisoka <[email protected]>
AuthorDate: Fri Feb 24 22:26:15 2023 +0800

    [Improve] Improve StarRocks Sink Config (#4212)
---
 .../connectors/seatunnel/starrocks/config/SinkConfig.java          | 4 +---
 .../seatunnel/starrocks/config/StarRocksSinkOptions.java           | 7 +------
 .../connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java  | 2 +-
 .../seatunnel/engine/server/task/SourceSplitEnumeratorTask.java    | 5 +++--
 4 files changed, 6 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 2095b9988..f00369cdf 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -64,9 +64,7 @@ public class SinkConfig implements Serializable {
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setNodeUrls(config.get(StarRocksSinkOptions.NODE_URLS));
         sinkConfig.setDatabase(config.get(StarRocksSinkOptions.DATABASE));
-        sinkConfig.setJdbcUrl(String.format("jdbc:mysql://%s:%d",
-            sinkConfig.getNodeUrls().get(0).split(":")[0],
-            config.get(StarRocksSinkOptions.QUERY_PORT)));
+        sinkConfig.setJdbcUrl(config.get(StarRocksOptions.BASE_URL));
         
config.getOptional(StarRocksOptions.USERNAME).ifPresent(sinkConfig::setUsername);
         
config.getOptional(StarRocksOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
         
config.getOptional(StarRocksSinkOptions.TABLE).ifPresent(sinkConfig::setTable);
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index 5a00b0e76..ee89f6cc2 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -29,7 +29,7 @@ public interface StarRocksSinkOptions {
     Option<List<String>> NODE_URLS = Options.key("nodeUrls")
         .listType()
         .noDefaultValue()
-        .withDescription("StarRocks cluster address, the format is 
[\"fe_ip:fe_http_port\", ...]");
+        .withDescription("StarRocks cluster http address, the format is 
[\"fe_ip:fe_http_port\", ...]");
 
     Option<String> LABEL_PREFIX = Options.key("labelPrefix")
         .stringType()
@@ -58,11 +58,6 @@ public interface StarRocksSinkOptions {
             "    \"replication_num\" = \"1\" \n" +
             ")").withDescription("Create table statement template, used to 
create StarRocks table");
 
-    Option<Integer> QUERY_PORT = Options.key("query_port")
-        .intType()
-        .defaultValue(9030)
-        .withDescription("FE MySQL server port");
-
     Option<Integer> BATCH_MAX_SIZE = Options.key("batch_max_rows")
         .intType()
         .defaultValue(1024)
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 54220278b..63da89d7b 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -42,7 +42,7 @@ public class StarRocksSinkFactory implements TableSinkFactory 
{
     public OptionRule optionRule() {
         return OptionRule.builder()
             .required(StarRocksOptions.USERNAME, StarRocksOptions.PASSWORD)
-            .required(StarRocksSinkOptions.DATABASE, 
StarRocksSinkOptions.QUERY_PORT)
+            .required(StarRocksSinkOptions.DATABASE, StarRocksOptions.BASE_URL)
             .required(StarRocksSinkOptions.NODE_URLS)
             .optional(StarRocksSinkOptions.TABLE, 
StarRocksSinkOptions.LABEL_PREFIX, StarRocksSinkOptions.BATCH_MAX_SIZE, 
StarRocksSinkOptions.BATCH_MAX_BYTES,
                 StarRocksSinkOptions.BATCH_INTERVAL_MS, 
StarRocksSinkOptions.MAX_RETRIES, StarRocksSinkOptions.MAX_RETRY_BACKOFF_MS,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 81b9df55f..c38302e0e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -132,15 +132,16 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
             this.prepareCloseBarrierId.set(barrier.getId());
         }
         final long barrierId = barrier.getId();
-        Serializable snapshotState = null;
+        Serializable snapshotState;
+        byte[] serialize = null;
         synchronized (enumeratorContext) {
             if (barrier.snapshot()) {
                 snapshotState = enumerator.snapshotState(barrierId);
+                serialize = enumeratorStateSerializer.serialize(snapshotState);
             }
             sendToAllReader(location -> new BarrierFlowOperation(barrier, 
location));
         }
         if (barrier.snapshot()) {
-            byte[] serialize = 
enumeratorStateSerializer.serialize(snapshotState);
             this.getExecutionContext().sendToMaster(new 
TaskAcknowledgeOperation(this.taskLocation, (CheckpointBarrier) barrier,
                 Collections.singletonList(new 
ActionSubtaskState(source.getId(), -1, Collections.singletonList(serialize)))));
         }

Reply via email to