This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dde4f7bcb [INLONG-10543][Manager] The delimiter and other 
configurations in CLS and ES sink are obtained from the stream (#10545)
1dde4f7bcb is described below

commit 1dde4f7bcb4a7c91e03bcfb213baa44dbbdbaab4
Author: fuweng11 <[email protected]>
AuthorDate: Mon Jul 1 12:35:26 2024 +0800

    [INLONG-10543][Manager] The delimiter and other configurations in CLS and 
ES sink are obtained from the stream (#10545)
---
 .../org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java    | 3 +++
 .../inlong/manager/service/sink/es/ElasticsearchSinkOperator.java      | 3 +++
 2 files changed, 6 insertions(+)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
index 0012a42d43..ab6c54fa51 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
@@ -152,6 +152,9 @@ public class ClsSinkOperator extends AbstractSinkOperator {
     public SinkConfig getSinkConfig(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo, StreamSink sink) {
         ClsSink clsSink = (ClsSink) sink;
         ClsSinkConfig sinkConfig = CommonBeanUtils.copyProperties(clsSink, 
ClsSinkConfig::new);
+        sinkConfig.setSeparator(String.valueOf((char) 
(Integer.parseInt(streamInfo.getDataSeparator()))));
+        sinkConfig.setFieldOffset(streamInfo.getExtendedFieldSize());
+        sinkConfig.setContentOffset(0);
         List<FieldConfig> fields = 
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
                 v -> {
                     FieldConfig fieldConfig = new FieldConfig();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index 753ae75aa6..d33062d34f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -201,6 +201,9 @@ public class ElasticsearchSinkOperator extends 
AbstractSinkOperator {
         ElasticsearchSinkDTO elasticsearchSinkDTO = 
ElasticsearchSinkDTO.getFromJson(streamSinkEntity.getExtParams());
         EsSinkConfig sinkConfig = 
CommonBeanUtils.copyProperties(elasticsearchSink, EsSinkConfig::new);
         CommonBeanUtils.copyProperties(elasticsearchSinkDTO, sinkConfig);
+        sinkConfig.setSeparator(String.valueOf((char) 
(Integer.parseInt(streamInfo.getDataSeparator()))));
+        sinkConfig.setFieldOffset(streamInfo.getExtendedFieldSize());
+        sinkConfig.setContentOffset(0);
         List<FieldConfig> fields = 
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
                 v -> {
                     FieldConfig fieldConfig = new FieldConfig();

Reply via email to