This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 8d5712c1d [Improve] Improve StarRocks Sink Config (#4212)
8d5712c1d is described below
commit 8d5712c1dbf33cddc814e8ae5b3283daff31c9be
Author: Hisoka <[email protected]>
AuthorDate: Fri Feb 24 22:26:15 2023 +0800
[Improve] Improve StarRocks Sink Config (#4212)
---
.../connectors/seatunnel/starrocks/config/SinkConfig.java | 4 +---
.../seatunnel/starrocks/config/StarRocksSinkOptions.java | 7 +------
.../connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java | 2 +-
.../seatunnel/engine/server/task/SourceSplitEnumeratorTask.java | 5 +++--
4 files changed, 6 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 2095b9988..f00369cdf 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -64,9 +64,7 @@ public class SinkConfig implements Serializable {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setNodeUrls(config.get(StarRocksSinkOptions.NODE_URLS));
sinkConfig.setDatabase(config.get(StarRocksSinkOptions.DATABASE));
- sinkConfig.setJdbcUrl(String.format("jdbc:mysql://%s:%d",
- sinkConfig.getNodeUrls().get(0).split(":")[0],
- config.get(StarRocksSinkOptions.QUERY_PORT)));
+ sinkConfig.setJdbcUrl(config.get(StarRocksOptions.BASE_URL));
config.getOptional(StarRocksOptions.USERNAME).ifPresent(sinkConfig::setUsername);
config.getOptional(StarRocksOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
config.getOptional(StarRocksSinkOptions.TABLE).ifPresent(sinkConfig::setTable);
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index 5a00b0e76..ee89f6cc2 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -29,7 +29,7 @@ public interface StarRocksSinkOptions {
Option<List<String>> NODE_URLS = Options.key("nodeUrls")
.listType()
.noDefaultValue()
- .withDescription("StarRocks cluster address, the format is
[\"fe_ip:fe_http_port\", ...]");
+ .withDescription("StarRocks cluster http address, the format is
[\"fe_ip:fe_http_port\", ...]");
Option<String> LABEL_PREFIX = Options.key("labelPrefix")
.stringType()
@@ -58,11 +58,6 @@ public interface StarRocksSinkOptions {
" \"replication_num\" = \"1\" \n" +
")").withDescription("Create table statement template, used to
create StarRocks table");
- Option<Integer> QUERY_PORT = Options.key("query_port")
- .intType()
- .defaultValue(9030)
- .withDescription("FE MySQL server port");
-
Option<Integer> BATCH_MAX_SIZE = Options.key("batch_max_rows")
.intType()
.defaultValue(1024)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 54220278b..63da89d7b 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -42,7 +42,7 @@ public class StarRocksSinkFactory implements TableSinkFactory
{
public OptionRule optionRule() {
return OptionRule.builder()
.required(StarRocksOptions.USERNAME, StarRocksOptions.PASSWORD)
- .required(StarRocksSinkOptions.DATABASE,
StarRocksSinkOptions.QUERY_PORT)
+ .required(StarRocksSinkOptions.DATABASE, StarRocksOptions.BASE_URL)
.required(StarRocksSinkOptions.NODE_URLS)
.optional(StarRocksSinkOptions.TABLE,
StarRocksSinkOptions.LABEL_PREFIX, StarRocksSinkOptions.BATCH_MAX_SIZE,
StarRocksSinkOptions.BATCH_MAX_BYTES,
StarRocksSinkOptions.BATCH_INTERVAL_MS,
StarRocksSinkOptions.MAX_RETRIES, StarRocksSinkOptions.MAX_RETRY_BACKOFF_MS,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 81b9df55f..c38302e0e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -132,15 +132,16 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
this.prepareCloseBarrierId.set(barrier.getId());
}
final long barrierId = barrier.getId();
- Serializable snapshotState = null;
+ Serializable snapshotState;
+ byte[] serialize = null;
synchronized (enumeratorContext) {
if (barrier.snapshot()) {
snapshotState = enumerator.snapshotState(barrierId);
+ serialize = enumeratorStateSerializer.serialize(snapshotState);
}
sendToAllReader(location -> new BarrierFlowOperation(barrier,
location));
}
if (barrier.snapshot()) {
- byte[] serialize =
enumeratorStateSerializer.serialize(snapshotState);
this.getExecutionContext().sendToMaster(new
TaskAcknowledgeOperation(this.taskLocation, (CheckpointBarrier) barrier,
Collections.singletonList(new
ActionSubtaskState(source.getId(), -1, Collections.singletonList(serialize)))));
}