This is an automated email from the ASF dual-hosted git repository.

wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b729a9d1c6 [INLONG-11877][Manager] Support verifying transform SQL 
(#11878)
b729a9d1c6 is described below

commit b729a9d1c6699d7748fdf9a8cde713410ffc6400
Author: fuweng11 <[email protected]>
AuthorDate: Thu Jun 5 15:09:43 2025 +0800

    [INLONG-11877][Manager] Support verifying transform SQL (#11878)
    
    * [INLONG-11877][Manager] Support verifying transform SQL
---
 .../client/api/inner/client/StreamSinkClient.java  | 15 ++++++
 .../manager/client/api/service/StreamSinkApi.java  |  4 ++
 .../manager/pojo/sink/TransformParseRequest.java   | 63 ++++++++++++++++++++++
 .../service/datatype/CsvDataTypeOperator.java      | 55 +++++++++++++++++++
 .../manager/service/datatype/DataTypeOperator.java |  8 +++
 .../service/datatype/KvDataTypeOperator.java       | 61 +++++++++++++++++++++
 .../manager/service/sink/StreamSinkService.java    | 10 ++++
 .../service/sink/StreamSinkServiceImpl.java        | 37 +++++++++++++
 .../web/controller/StreamSinkController.java       |  6 +++
 .../openapi/OpenStreamSinkController.java          |  8 +++
 10 files changed, 267 insertions(+)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
index c350900f06..202b5c24a9 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
@@ -30,10 +30,12 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
 
 import org.apache.commons.lang3.tuple.Pair;
 
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
@@ -171,4 +173,17 @@ public class StreamSinkClient {
         ParseFieldRequest request = 
ParseFieldRequest.builder().method(method).statement(statement).build();
         return parseFields(request);
     }
+
+    /**
+     * Parse transform sql for data
+     *
+     * @param transformParseRequest the request for parse transform
+     * @return result of parse result
+     */
+    public Map<String, Object> parseTransform(TransformParseRequest 
transformParseRequest) {
+        Response<Map<String, Object>> response =
+                
ClientUtils.executeHttpCall(streamSinkApi.parseTransform(transformParseRequest));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
index 75088a5eee..168ef85d0f 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
 
 import retrofit2.Call;
 import retrofit2.http.Body;
@@ -36,6 +37,7 @@ import retrofit2.http.Path;
 import retrofit2.http.Query;
 
 import java.util.List;
+import java.util.Map;
 
 public interface StreamSinkApi {
 
@@ -67,4 +69,6 @@ public interface StreamSinkApi {
     @POST("sink/parseFields")
     Call<Response<List<SinkField>>> parseFields(@Body ParseFieldRequest 
parseFieldRequest);
 
+    @POST("sink/parseTransform")
+    Call<Response<Map<String, Object>>> parseTransform(@Body 
TransformParseRequest request);
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/TransformParseRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/TransformParseRequest.java
new file mode 100644
index 0000000000..94d9d79615
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/TransformParseRequest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink;
+
+import org.apache.inlong.manager.common.validation.SaveValidation;
+import org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.hibernate.validator.constraints.Length;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.List;
+
+/**
+ * Request for Dirty data
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@ApiModel("Transform parse request")
+public class TransformParseRequest {
+
+    @ApiModelProperty("Inlong group id")
+    @NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class}, 
message = "inlongGroupId cannot be blank")
+    @Length(min = 4, max = 256, message = "length must be between 4 and 200")
+    @Pattern(regexp = "^[a-zA-Z0-9_.-]{4,200}$", message = "only supports 
letters, numbers, '.', '-', or '_'")
+    private String inlongGroupId;
+
+    @ApiModelProperty("Inlong stream id")
+    @NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class}, 
message = "inlongStreamId cannot be blank")
+    @Length(min = 1, max = 256, message = "inlongStreamId length must be 
between 1 and 200")
+    @Pattern(regexp = "^[a-zA-Z0-9_.-]{1,200}$", message = "inlongStreamId 
only supports letters, numbers, '.', '-', or '_'")
+    private String inlongStreamId;
+
+    @ApiModelProperty("Transform sql")
+    private String transformSql;
+
+    @ApiModelProperty("Data")
+    private String data;
+
+    @ApiModelProperty("Sink field list")
+    private List<SinkField> sinkFieldList;
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
index 4f12339014..f0aaa79713 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
@@ -20,16 +20,28 @@ package org.apache.inlong.manager.service.datatype;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
 import org.apache.inlong.sdk.transform.decode.SplitUtils;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 @Slf4j
 @Service
@@ -84,4 +96,47 @@ public class CsvDataTypeOperator implements DataTypeOperator 
{
         csvConfig.setEscapeChar(escape);
         return csvConfig;
     }
+
+    @Override
+    public Map<String, Object> parseTransform(InlongStreamInfo streamInfo, 
List<SinkField> fieldList,
+            String transformSql,
+            String data) {
+        try {
+            List<org.apache.inlong.sdk.transform.pojo.FieldInfo> srcFields = 
new ArrayList<>();
+            List<org.apache.inlong.sdk.transform.pojo.FieldInfo> dstFields = 
new ArrayList<>();
+            for (StreamField streamField : streamInfo.getFieldList()) {
+                if (StringUtils.isNotBlank(streamField.getFieldName())) {
+                    srcFields.add(
+                            new 
org.apache.inlong.sdk.transform.pojo.FieldInfo(streamField.getFieldName(),
+                                    TypeConverter.DefaultTypeConverter()));
+                }
+            }
+            for (SinkField sinkField : fieldList) {
+                String targetFieldName = sinkField.getFieldName();
+                if (StringUtils.isNotBlank(targetFieldName)) {
+                    dstFields.add(new 
org.apache.inlong.sdk.transform.pojo.FieldInfo(targetFieldName));
+                }
+            }
+            char separator = '&';
+            if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
+                separator = (char) 
Integer.parseInt(streamInfo.getDataSeparator());
+            }
+            Character escape = null;
+            if (StringUtils.isNotBlank(streamInfo.getDataEscapeChar())) {
+                escape = streamInfo.getDataEscapeChar().charAt(0);
+            }
+            CsvSourceInfo csvSource = new 
CsvSourceInfo(streamInfo.getDataEncoding(), separator, escape, srcFields);
+            MapSinkInfo mapSinkInfo = new 
MapSinkInfo(streamInfo.getDataEncoding(), dstFields);
+            TransformConfig config = new TransformConfig(transformSql);
+            TransformProcessor<String, Map<String, Object>> processor = 
TransformProcessor
+                    .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                            SinkEncoderFactory.createMapEncoder(mapSinkInfo));
+            List<Map<String, Object>> result = processor.transform(data);
+            log.info("success parse transform sql result={}", result);
+            return result.get(0);
+        } catch (Exception e) {
+            log.error("parse transform sql failed", e);
+            throw new BusinessException("parse transform sql failed" + 
e.getMessage());
+        }
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
index ad1a577473..7919a56a6d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
@@ -22,9 +22,11 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Data type operator
@@ -51,4 +53,10 @@ public interface DataTypeOperator {
                 String.format("current type is not support for data type=%s", 
streamInfo.getDataType()));
     }
 
+    default Map<String, Object> parseTransform(InlongStreamInfo streamInfo, 
List<SinkField> fieldList,
+            String transformSql, String data) {
+        throw new BusinessException(
+                String.format("current type is not support for data type=%s", 
streamInfo.getDataType()));
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
index 900732f906..a2957baace 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
@@ -20,15 +20,26 @@ package org.apache.inlong.manager.service.datatype;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.sdk.transform.decode.KvUtils;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -101,4 +112,54 @@ public class KvDataTypeOperator implements 
DataTypeOperator {
         kvConfig.setEscapeChar(escape);
         return kvConfig;
     }
+
+    @Override
+    public Map<String, Object> parseTransform(InlongStreamInfo streamInfo, 
List<SinkField> fieldList,
+            String transformSql,
+            String data) {
+        try {
+            List<org.apache.inlong.sdk.transform.pojo.FieldInfo> srcFields = 
new ArrayList<>();
+            List<org.apache.inlong.sdk.transform.pojo.FieldInfo> dstFields = 
new ArrayList<>();
+            for (StreamField streamField : streamInfo.getFieldList()) {
+                if (StringUtils.isNotBlank(streamField.getFieldName())) {
+                    srcFields.add(
+                            new 
org.apache.inlong.sdk.transform.pojo.FieldInfo(streamField.getFieldName(),
+                                    TypeConverter.DefaultTypeConverter()));
+                }
+            }
+            for (SinkField sinkField : fieldList) {
+                String targetFieldName = sinkField.getFieldName();
+                if (StringUtils.isNotBlank(targetFieldName)) {
+                    dstFields.add(new 
org.apache.inlong.sdk.transform.pojo.FieldInfo(targetFieldName));
+                }
+            }
+            char separator = '&';
+            if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
+                separator = (char) 
Integer.parseInt(streamInfo.getDataSeparator());
+            }
+            Character escape = null;
+            if (StringUtils.isNotBlank(streamInfo.getDataEscapeChar())) {
+                escape = streamInfo.getDataEscapeChar().charAt(0);
+            }
+            char kvSeparator = '=';
+            if (StringUtils.isNotBlank(streamInfo.getKvSeparator())) {
+                kvSeparator = (char) 
Integer.parseInt(streamInfo.getKvSeparator());
+            }
+            KvSourceInfo kvSourceInfo = new 
KvSourceInfo(streamInfo.getDataEncoding(), srcFields);
+            kvSourceInfo.setEscapeChar(escape);
+            kvSourceInfo.setEntryDelimiter(separator);
+            kvSourceInfo.setKvDelimiter(kvSeparator);
+            MapSinkInfo mapSinkInfo = new 
MapSinkInfo(streamInfo.getDataEncoding(), dstFields);
+            TransformConfig config = new TransformConfig(transformSql);
+            TransformProcessor<String, Map<String, Object>> processor = 
TransformProcessor
+                    .create(config, 
SourceDecoderFactory.createKvDecoder(kvSourceInfo),
+                            SinkEncoderFactory.createMapEncoder(mapSinkInfo));
+            List<Map<String, Object>> result = processor.transform(data);
+            log.info("success parse transform sql result={}", result);
+            return result.get(0);
+        } catch (Exception e) {
+            log.error("parse transform sql failed", e);
+            throw new BusinessException("parse transform sql failed" + 
e.getMessage());
+        }
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index a0b6a29443..bd6d915227 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.user.UserInfo;
 
@@ -247,4 +248,13 @@ public interface StreamSinkService {
      * @return list of sink field
      */
     List<SinkField> parseFields(ParseFieldRequest parseFieldRequest);
+
+    /**
+     * Parse transform sql for data
+     *
+     * @param request the request for parse transform
+     * @return result of parse result
+     */
+    Map<String, Object> parseTransform(TransformParseRequest request);
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 431d44b3d0..e306c1f6f1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sink;
 
 import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.OperationTarget;
@@ -32,12 +33,14 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
 import org.apache.inlong.manager.dao.entity.SortConfigEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
 import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
@@ -59,8 +62,12 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.user.UserInfo;
+import org.apache.inlong.manager.service.datatype.DataTypeOperator;
+import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
 import org.apache.inlong.manager.service.group.GroupCheckService;
 import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
 import org.apache.inlong.manager.service.user.UserService;
@@ -106,6 +113,7 @@ import static 
org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NO
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+import static 
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
 import static 
org.apache.inlong.manager.service.resource.queue.pulsar.PulsarQueueResourceOperator.PULSAR_SUBSCRIPTION;
 import static 
org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQQueueResourceOperator.TUBE_CONSUMER_GROUP;
 
@@ -130,12 +138,16 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
     @Autowired
     private InlongStreamEntityMapper streamMapper;
     @Autowired
+    private InlongStreamFieldEntityMapper streamFieldMapper;
+    @Autowired
     private InlongGroupEntityMapper groupMapper;
     @Autowired
     private StreamSinkEntityMapper sinkMapper;
     @Autowired
     private StreamSinkFieldEntityMapper sinkFieldMapper;
     @Autowired
+    public DataTypeOperatorFactory dataTypeOperatorFactory;
+    @Autowired
     private AutowireCapableBeanFactory autowireCapableBeanFactory;
     @Autowired
     private ObjectMapper objectMapper;
@@ -746,6 +758,31 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         }
     }
 
+    @Override
+    public Map<String, Object> parseTransform(TransformParseRequest request) {
+        LOGGER.info("begin to parse transform for data: {}", request);
+        // Check whether the stream exist or not
+        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
+                request.getInlongGroupId(), request.getInlongStreamId());
+        Preconditions.expectNotNull(streamEntity, 
ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
+        
Preconditions.expectTrue(CollectionUtils.isNotEmpty(request.getSinkFieldList()),
+                ErrorCodeEnum.SINK_FIELD_LIST_IS_EMPTY.getMessage());
+        Preconditions.expectNotBlank(request.getTransformSql(), "Transform sql 
is empty");
+        DataTypeOperator dataTypeOperator =
+                
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamEntity.getDataType()));
+        InlongStreamInfo streamInfo = 
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
+        unpackExtParams(streamEntity.getExtParams(), streamInfo);
+        List<InlongStreamFieldEntity> fieldEntityList =
+                
streamFieldMapper.selectByIdentifier(request.getInlongGroupId(), 
request.getInlongStreamId());
+        List<StreamField> streamFields = 
CommonBeanUtils.copyListProperties(fieldEntityList, StreamField::new);
+        streamInfo.setFieldList(streamFields);
+        Map<String, Object> result =
+                dataTypeOperator.parseTransform(streamInfo, 
request.getSinkFieldList(), request.getTransformSql(),
+                        request.getData());
+        LOGGER.info("success to parse transform for data: {}, result={}", 
request, result);
+        return result;
+    }
+
     private List<SinkField> parseFieldsByCsv(String statement) {
         String[] lines = statement.split(InlongConstants.NEW_LINE);
         List<SinkField> fields = new ArrayList<>();
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 331e3a5355..588dfec8fc 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
 import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.service.dirtyData.DirtyQueryLogService;
 import org.apache.inlong.manager.service.operationlog.OperationLog;
@@ -181,4 +182,9 @@ public class StreamSinkController {
         return Response.success(dirtyQueryLogService.getSqlTaskStatus(taskId));
     }
 
+    @RequestMapping(value = "/sink/parseTransform", method = 
RequestMethod.POST)
+    @ApiOperation(value = "parse transform sql from data")
+    public Response<Map<String, Object>> parseTransform(@RequestBody 
TransformParseRequest request) {
+        return Response.success(sinkService.parseTransform(request));
+    }
 }
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
index e999e1ba28..1c313f77e3 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
 import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.service.operationlog.OperationLog;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
@@ -45,6 +46,7 @@ import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Open InLong Stream Sink controller
@@ -113,4 +115,10 @@ public class OpenStreamSinkController {
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(sinkService.delete(id, startProcess, 
LoginUserUtils.getLoginUser().getName()));
     }
+
+    @RequestMapping(value = "/sink/parseTransform", method = 
RequestMethod.POST)
+    @ApiOperation(value = "parse transform sql from data")
+    public Response<Map<String, Object>> parseTransform(@RequestBody 
TransformParseRequest request) {
+        return Response.success(sinkService.parseTransform(request));
+    }
 }

Reply via email to