This is an automated email from the ASF dual-hosted git repository.

wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 03c23fd527 [INLONG-11879][Manager] Support parsing transform 
configuration as transform (#11880)
03c23fd527 is described below

commit 03c23fd527b8a38ade53816923b5eeed2380819d
Author: fuweng11 <[email protected]>
AuthorDate: Mon Jun 9 10:31:00 2025 +0800

    [INLONG-11879][Manager] Support parsing transform configuration as 
transform (#11880)
---
 .../api/inner/client/StreamTransformClient.java    |  9 +++
 .../client/api/service/StreamTransformApi.java     |  3 +
 .../pojo/sort/util/FilterFunctionUtils.java        | 12 ++-
 .../service/transform/StreamTransformService.java  | 18 +++++
 .../transform/StreamTransformServiceImpl.java      | 55 +++++++++++++
 .../service/transform/TransformSqlParser.java      | 91 ++++++++++++++++++++++
 .../web/controller/StreamTransformController.java  |  7 ++
 .../openapi/OpenStreamTransformController.java     |  7 ++
 8 files changed, 199 insertions(+), 3 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
index 9d7fb2b5a3..25ea377750 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
@@ -90,4 +90,13 @@ public class StreamTransformClient {
         ClientUtils.assertRespSuccess(response);
         return response.getData();
     }
+
+    /**
+     * Parse transform sql
+     */
+    public String parseTransformSql(TransformRequest transformRequest) {
+        Response<String> response = 
ClientUtils.executeHttpCall(streamTransformApi.parseTransformSql(transformRequest));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamTransformApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamTransformApi.java
index bd9c686e5d..edd47686c8 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamTransformApi.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamTransformApi.java
@@ -44,4 +44,7 @@ public interface StreamTransformApi {
     Call<Response<Boolean>> deleteTransform(@Query("inlongGroupId") String 
groupId,
             @Query("inlongStreamId") String streamId, @Query("transformName") 
String transformName);
 
+    @POST("transform/parseTransformSql")
+    Call<Response<String>> parseTransformSql(@Body TransformRequest request);
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
index c66dccd511..b5aab47d42 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.pojo.sort.util;
 
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.StringTypeInfo;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.TransformType;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -38,6 +40,7 @@ import 
org.apache.inlong.sort.protocol.transformation.FunctionParam;
 import org.apache.inlong.sort.protocol.transformation.LogicOperator;
 import 
org.apache.inlong.sort.protocol.transformation.MultiValueCompareOperator;
 import 
org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
 import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@@ -204,12 +207,15 @@ public class FilterFunctionUtils {
             return new ConstantParam("");
         }
         boolean isConstant = value.isConstant();
+        StreamField targetField = value.getTargetField();
+        String fieldType = targetField.getFieldType();
         if (isConstant) {
+            FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
+                    targetField.getFieldType(), targetField.getFieldFormat());
             String constant = value.getTargetConstant();
-            return new ConstantParam(constant);
+            return formatInfo.getTypeInfo() == StringTypeInfo.INSTANCE ? new 
StringConstantParam(constant)
+                    : new ConstantParam(constant);
         } else {
-            StreamField targetField = value.getTargetField();
-            String fieldType = targetField.getFieldType();
             String fieldFormat = targetField.getFieldFormat();
             String fieldName = targetField.getFieldName();
             if (FieldType.FUNCTION.name().equalsIgnoreCase(fieldType)) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
index b8d10ebb92..c4506b2eb0 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.transform;
 
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.transform.DeleteTransformRequest;
 import org.apache.inlong.manager.pojo.transform.TransformPageRequest;
 import org.apache.inlong.manager.pojo.transform.TransformRequest;
@@ -83,4 +84,21 @@ public interface StreamTransformService {
      * @return Whether succeed
      */
     Boolean delete(DeleteTransformRequest request, String operator);
+
+    /**
+     * Get transform sql by sink
+     *
+     * @param sink sink info
+     * @return return transform sql
+     */
+    String getTransformSql(StreamSink sink);
+
+    /**
+     * Parse transform sql by transform information
+     *
+     * @param request the transform request
+     * @return return transform sql
+     */
+    String parseTransformSql(TransformRequest request, String operator);
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 40269450a9..42df71702f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -23,12 +23,17 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.entity.StreamTransformEntity;
 import org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamTransformFieldEntityMapper;
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.transform.DeleteTransformRequest;
 import org.apache.inlong.manager.pojo.transform.TransformPageRequest;
@@ -36,9 +41,11 @@ import 
org.apache.inlong.manager.pojo.transform.TransformRequest;
 import org.apache.inlong.manager.pojo.transform.TransformResponse;
 import org.apache.inlong.manager.pojo.user.UserInfo;
 import org.apache.inlong.manager.service.group.GroupCheckService;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.user.UserService;
 
 import com.github.pagehelper.PageHelper;
+import com.google.common.collect.Sets;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -53,6 +60,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -70,8 +78,12 @@ public class StreamTransformServiceImpl implements 
StreamTransformService {
     @Autowired
     protected StreamTransformFieldEntityMapper transformFieldMapper;
     @Autowired
+    private StreamSinkFieldEntityMapper sinkFieldEntityMapper;
+    @Autowired
     protected GroupCheckService groupCheckService;
     @Autowired
+    private StreamSinkService sinkService;
+    @Autowired
     protected UserService userService;
 
     @Override
@@ -98,6 +110,7 @@ public class StreamTransformServiceImpl implements 
StreamTransformService {
         transformEntity.setModifier(operator);
         transformMapper.insert(transformEntity);
         saveFieldOpt(transformEntity, request.getFieldList());
+        saveTransformSql(request, operator);
         return transformEntity.getId();
     }
 
@@ -183,6 +196,7 @@ public class StreamTransformServiceImpl implements 
StreamTransformService {
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
         updateFieldOpt(transformEntity, request.getFieldList());
+        saveTransformSql(request, operator);
         return true;
     }
 
@@ -341,4 +355,45 @@ public class StreamTransformServiceImpl implements 
StreamTransformService {
         transformFieldMapper.insertAll(entityList);
         LOGGER.debug("success to save transform fields");
     }
+
+    @Override
+    public String getTransformSql(StreamSink sink) {
+        List<TransformResponse> transformResponseList =
+                listTransform(sink.getInlongGroupId(), 
sink.getInlongStreamId());
+        List<TransformResponse> filterList = transformResponseList.stream()
+                .filter(v -> {
+                    Set<String> postNodes = 
Sets.newHashSet(v.getPostNodeNames().split(InlongConstants.COMMA));
+                    return postNodes.contains(sink.getSinkName());
+                }).collect(
+                        Collectors.toList());
+        List<StreamSinkFieldEntity> sinkFieldEntityList = 
sinkFieldEntityMapper.selectBySinkId(sink.getId());
+        return TransformSqlParser.parse(filterList,
+                CommonBeanUtils.copyListProperties(sinkFieldEntityList, 
SinkField::new));
+    }
+
+    @Override
+    public String parseTransformSql(TransformRequest request, String operator) 
{
+        List<SinkField> sinkFields = 
CommonBeanUtils.copyListProperties(request.getFieldList(), SinkField::new);
+        sinkFields.forEach(v -> v.setSourceFieldName(v.getOriginFieldName()));
+        List<TransformResponse> filterList =
+                
Collections.singletonList(CommonBeanUtils.copyProperties(request, 
TransformResponse::new));
+        return TransformSqlParser.parse(filterList,
+                CommonBeanUtils.copyListProperties(sinkFields, 
SinkField::new));
+    }
+
+    private void saveTransformSql(TransformRequest request, String operator) {
+        LOGGER.info("begin to save transform sql: {}", request);
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        if 
(InlongConstants.STANDARD_MODE.equals(groupEntity.getInlongGroupMode())) {
+            Set<String> postNodes = 
Sets.newHashSet(request.getPostNodeNames().split(InlongConstants.COMMA));
+            List<StreamSink> sinkList = sinkService.listSink(groupId, 
streamId);
+            sinkList.stream().filter(v -> 
postNodes.contains(v.getSinkName())).forEach(v -> {
+                SinkRequest sinkRequest = v.genSinkRequest();
+                sinkRequest.setTransformSql(getTransformSql(v));
+                sinkService.update(sinkRequest, operator);
+            });
+        }
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/TransformSqlParser.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/TransformSqlParser.java
new file mode 100644
index 0000000000..5bac783cb1
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/TransformSqlParser.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.transform;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sort.util.FilterFunctionUtils;
+import org.apache.inlong.manager.pojo.sort.util.StreamParseUtils;
+import org.apache.inlong.manager.pojo.transform.TransformDefinition;
+import org.apache.inlong.manager.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.pojo.transform.filter.FilterDefinition;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class TransformSqlParser {
+
+    private static final Logger log = 
LoggerFactory.getLogger(TransformSqlParser.class);
+
+    public static String parse(List<TransformResponse> transformResponseList, 
List<SinkField> sinkFields) {
+        log.info("start to parse transform sql for transform list={}", 
transformResponseList);
+        StringBuilder result = new StringBuilder()
+                .append(genSimpleSelectSql(sinkFields))
+                .append(getFilterSql(transformResponseList));
+        log.info("success to parse transform for transform list={}, 
result={}", transformResponseList, result);
+        return result.toString().replaceAll("`", "");
+    }
+
+    private static StringBuilder genSimpleSelectSql(List<SinkField> fields) {
+        StringBuilder sb = new StringBuilder("SELECT ");
+        for (SinkField field : fields) {
+            sb.append("`").append(field.getSourceFieldName()).append("`,");
+        }
+        sb.deleteCharAt(sb.length() - 1).append(" FROM SOURCE");
+        return sb;
+    }
+
+    private static StringBuilder getFilterSql(List<TransformResponse> 
filterList) {
+        StringBuilder stringBuilder = new StringBuilder();
+        for (TransformResponse transformResponse : filterList) {
+            TransformType transformType = 
TransformType.forType(transformResponse.getTransformType());
+            TransformDefinition transformDefinition = 
StreamParseUtils.parseTransformDefinition(
+                    transformResponse.getTransformDefinition(), transformType);
+            String transformName = transformResponse.getTransformName();
+            switch (transformType) {
+                case FILTER:
+                    stringBuilder.append(" WHERE");
+                    FilterDefinition filterDefinition = (FilterDefinition) 
transformDefinition;
+                    List<FilterFunction> filterFunctions =
+                            
FilterFunctionUtils.createFilterFunctions(filterDefinition, transformName);
+                    for (FilterFunction filterFunction : filterFunctions) {
+                        
stringBuilder.append(InlongConstants.BLANK).append(filterFunction.format());
+                    }
+                    break;
+                case SPLITTER:
+                case STRING_REPLACER:
+                case ENCRYPT:
+                case DE_DUPLICATION:
+                case JOINER:
+                case LOOKUP_JOINER:
+                case TEMPORAL_JOINER:
+                case INTERVAL_JOINER:
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            String.format("Unsupported transformType=%s", 
transformType));
+            }
+        }
+        return stringBuilder;
+    }
+
+}
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
index dc7ec96b4e..6b9533e963 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
@@ -104,4 +104,11 @@ public class StreamTransformController {
         return 
Response.success(transformFunctionDocService.listByCondition(request));
     }
 
+    @RequestMapping(value = "/transform/parseTransformSql", method = 
RequestMethod.POST)
+    @ApiOperation(value = "Parse stream transform sql")
+    public Response<String> parseTransformSql(@Validated @RequestBody 
TransformRequest request) {
+        return Response.success(
+                streamTransformService.parseTransformSql(request, 
LoginUserUtils.getLoginUser().getName()));
+    }
+
 }
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
index ce22dc2010..2962452190 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
@@ -105,4 +105,11 @@ public class OpenStreamTransformController {
             @Validated @RequestBody TransformFunctionDocRequest request) {
         return 
Response.success(transformFunctionDocService.listByCondition(request));
     }
+
+    @RequestMapping(value = "/transform/parseTransformSql", method = 
RequestMethod.POST)
+    @ApiOperation(value = "Parse stream transform sql")
+    public Response<String> parseTransformSql(@Validated @RequestBody 
TransformRequest request) {
+        return Response.success(
+                streamTransformService.parseTransformSql(request, 
LoginUserUtils.getLoginUser().getName()));
+    }
 }

Reply via email to