This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1dde4f7bcb [INLONG-10543][Manager] The delimiter and other
configurations in CLS and ES sink are obtained from the stream (#10545)
1dde4f7bcb is described below
commit 1dde4f7bcb4a7c91e03bcfb213baa44dbbdbaab4
Author: fuweng11 <[email protected]>
AuthorDate: Mon Jul 1 12:35:26 2024 +0800
[INLONG-10543][Manager] The delimiter and other configurations in CLS and
ES sink are obtained from the stream (#10545)
---
.../org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java | 3 +++
.../inlong/manager/service/sink/es/ElasticsearchSinkOperator.java | 3 +++
2 files changed, 6 insertions(+)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
index 0012a42d43..ab6c54fa51 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
@@ -152,6 +152,9 @@ public class ClsSinkOperator extends AbstractSinkOperator {
public SinkConfig getSinkConfig(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, StreamSink sink) {
ClsSink clsSink = (ClsSink) sink;
ClsSinkConfig sinkConfig = CommonBeanUtils.copyProperties(clsSink,
ClsSinkConfig::new);
+ sinkConfig.setSeparator(String.valueOf((char)
(Integer.parseInt(streamInfo.getDataSeparator()))));
+ sinkConfig.setFieldOffset(streamInfo.getExtendedFieldSize());
+ sinkConfig.setContentOffset(0);
List<FieldConfig> fields =
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
v -> {
FieldConfig fieldConfig = new FieldConfig();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index 753ae75aa6..d33062d34f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -201,6 +201,9 @@ public class ElasticsearchSinkOperator extends
AbstractSinkOperator {
ElasticsearchSinkDTO elasticsearchSinkDTO =
ElasticsearchSinkDTO.getFromJson(streamSinkEntity.getExtParams());
EsSinkConfig sinkConfig =
CommonBeanUtils.copyProperties(elasticsearchSink, EsSinkConfig::new);
CommonBeanUtils.copyProperties(elasticsearchSinkDTO, sinkConfig);
+ sinkConfig.setSeparator(String.valueOf((char)
(Integer.parseInt(streamInfo.getDataSeparator()))));
+ sinkConfig.setFieldOffset(streamInfo.getExtendedFieldSize());
+ sinkConfig.setContentOffset(0);
List<FieldConfig> fields =
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
v -> {
FieldConfig fieldConfig = new FieldConfig();