This is an automated email from the ASF dual-hosted git repository.
wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 03c23fd527 [INLONG-11879][Manager] Support parsing transform
configuration as transform (#11880)
03c23fd527 is described below
commit 03c23fd527b8a38ade53816923b5eeed2380819d
Author: fuweng11 <[email protected]>
AuthorDate: Mon Jun 9 10:31:00 2025 +0800
[INLONG-11879][Manager] Support parsing transform configuration as
transform (#11880)
---
.../api/inner/client/StreamTransformClient.java | 9 +++
.../client/api/service/StreamTransformApi.java | 3 +
.../pojo/sort/util/FilterFunctionUtils.java | 12 ++-
.../service/transform/StreamTransformService.java | 18 +++++
.../transform/StreamTransformServiceImpl.java | 55 +++++++++++++
.../service/transform/TransformSqlParser.java | 91 ++++++++++++++++++++++
.../web/controller/StreamTransformController.java | 7 ++
.../openapi/OpenStreamTransformController.java | 7 ++
8 files changed, 199 insertions(+), 3 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
index 9d7fb2b5a3..25ea377750 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
@@ -90,4 +90,13 @@ public class StreamTransformClient {
ClientUtils.assertRespSuccess(response);
return response.getData();
}
+
+ /**
+ * Parse transform sql
+ */
+ public String parseTransformSql(TransformRequest transformRequest) {
+ Response<String> response =
ClientUtils.executeHttpCall(streamTransformApi.parseTransformSql(transformRequest));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamTransformApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamTransformApi.java
index bd9c686e5d..edd47686c8 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamTransformApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamTransformApi.java
@@ -44,4 +44,7 @@ public interface StreamTransformApi {
Call<Response<Boolean>> deleteTransform(@Query("inlongGroupId") String
groupId,
@Query("inlongStreamId") String streamId, @Query("transformName")
String transformName);
+ @POST("transform/parseTransformSql")
+ Call<Response<String>> parseTransformSql(@Body TransformRequest request);
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
index c66dccd511..b5aab47d42 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.pojo.sort.util;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.StringTypeInfo;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.TransformType;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -38,6 +40,7 @@ import
org.apache.inlong.sort.protocol.transformation.FunctionParam;
import org.apache.inlong.sort.protocol.transformation.LogicOperator;
import
org.apache.inlong.sort.protocol.transformation.MultiValueCompareOperator;
import
org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
import
org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@@ -204,12 +207,15 @@ public class FilterFunctionUtils {
return new ConstantParam("");
}
boolean isConstant = value.isConstant();
+ StreamField targetField = value.getTargetField();
+ String fieldType = targetField.getFieldType();
if (isConstant) {
+ FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
+ targetField.getFieldType(), targetField.getFieldFormat());
String constant = value.getTargetConstant();
- return new ConstantParam(constant);
+ return formatInfo.getTypeInfo() == StringTypeInfo.INSTANCE ? new
StringConstantParam(constant)
+ : new ConstantParam(constant);
} else {
- StreamField targetField = value.getTargetField();
- String fieldType = targetField.getFieldType();
String fieldFormat = targetField.getFieldFormat();
String fieldName = targetField.getFieldName();
if (FieldType.FUNCTION.name().equalsIgnoreCase(fieldType)) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
index b8d10ebb92..c4506b2eb0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.transform;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.transform.DeleteTransformRequest;
import org.apache.inlong.manager.pojo.transform.TransformPageRequest;
import org.apache.inlong.manager.pojo.transform.TransformRequest;
@@ -83,4 +84,21 @@ public interface StreamTransformService {
* @return Whether succeed
*/
Boolean delete(DeleteTransformRequest request, String operator);
+
+ /**
+ * Get transform sql by sink
+ *
+ * @param sink sink info
+ * @return return transform sql
+ */
+ String getTransformSql(StreamSink sink);
+
+ /**
+ * Parse transform sql by transform information
+ *
+ * @param request the transform request
+ * @return return transform sql
+ */
+ String parseTransformSql(TransformRequest request, String operator);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 40269450a9..42df71702f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -23,12 +23,17 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.entity.StreamTransformEntity;
import org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamTransformFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.transform.DeleteTransformRequest;
import org.apache.inlong.manager.pojo.transform.TransformPageRequest;
@@ -36,9 +41,11 @@ import
org.apache.inlong.manager.pojo.transform.TransformRequest;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.group.GroupCheckService;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.user.UserService;
import com.github.pagehelper.PageHelper;
+import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -53,6 +60,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -70,8 +78,12 @@ public class StreamTransformServiceImpl implements
StreamTransformService {
@Autowired
protected StreamTransformFieldEntityMapper transformFieldMapper;
@Autowired
+ private StreamSinkFieldEntityMapper sinkFieldEntityMapper;
+ @Autowired
protected GroupCheckService groupCheckService;
@Autowired
+ private StreamSinkService sinkService;
+ @Autowired
protected UserService userService;
@Override
@@ -98,6 +110,7 @@ public class StreamTransformServiceImpl implements
StreamTransformService {
transformEntity.setModifier(operator);
transformMapper.insert(transformEntity);
saveFieldOpt(transformEntity, request.getFieldList());
+ saveTransformSql(request, operator);
return transformEntity.getId();
}
@@ -183,6 +196,7 @@ public class StreamTransformServiceImpl implements
StreamTransformService {
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
updateFieldOpt(transformEntity, request.getFieldList());
+ saveTransformSql(request, operator);
return true;
}
@@ -341,4 +355,45 @@ public class StreamTransformServiceImpl implements
StreamTransformService {
transformFieldMapper.insertAll(entityList);
LOGGER.debug("success to save transform fields");
}
+
+ @Override
+ public String getTransformSql(StreamSink sink) {
+ List<TransformResponse> transformResponseList =
+ listTransform(sink.getInlongGroupId(),
sink.getInlongStreamId());
+ List<TransformResponse> filterList = transformResponseList.stream()
+ .filter(v -> {
+ Set<String> postNodes =
Sets.newHashSet(v.getPostNodeNames().split(InlongConstants.COMMA));
+ return postNodes.contains(sink.getSinkName());
+ }).collect(
+ Collectors.toList());
+ List<StreamSinkFieldEntity> sinkFieldEntityList =
sinkFieldEntityMapper.selectBySinkId(sink.getId());
+ return TransformSqlParser.parse(filterList,
+ CommonBeanUtils.copyListProperties(sinkFieldEntityList,
SinkField::new));
+ }
+
+ @Override
+ public String parseTransformSql(TransformRequest request, String operator)
{
+ List<SinkField> sinkFields =
CommonBeanUtils.copyListProperties(request.getFieldList(), SinkField::new);
+ sinkFields.forEach(v -> v.setSourceFieldName(v.getOriginFieldName()));
+ List<TransformResponse> filterList =
+
Collections.singletonList(CommonBeanUtils.copyProperties(request,
TransformResponse::new));
+ return TransformSqlParser.parse(filterList,
+ CommonBeanUtils.copyListProperties(sinkFields,
SinkField::new));
+ }
+
+ private void saveTransformSql(TransformRequest request, String operator) {
+ LOGGER.info("begin to save transform sql: {}", request);
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ if
(InlongConstants.STANDARD_MODE.equals(groupEntity.getInlongGroupMode())) {
+ Set<String> postNodes =
Sets.newHashSet(request.getPostNodeNames().split(InlongConstants.COMMA));
+ List<StreamSink> sinkList = sinkService.listSink(groupId,
streamId);
+ sinkList.stream().filter(v ->
postNodes.contains(v.getSinkName())).forEach(v -> {
+ SinkRequest sinkRequest = v.genSinkRequest();
+ sinkRequest.setTransformSql(getTransformSql(v));
+ sinkService.update(sinkRequest, operator);
+ });
+ }
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/TransformSqlParser.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/TransformSqlParser.java
new file mode 100644
index 0000000000..5bac783cb1
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/TransformSqlParser.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.transform;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sort.util.FilterFunctionUtils;
+import org.apache.inlong.manager.pojo.sort.util.StreamParseUtils;
+import org.apache.inlong.manager.pojo.transform.TransformDefinition;
+import org.apache.inlong.manager.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.pojo.transform.filter.FilterDefinition;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class TransformSqlParser {
+
+ private static final Logger log =
LoggerFactory.getLogger(TransformSqlParser.class);
+
+ public static String parse(List<TransformResponse> transformResponseList,
List<SinkField> sinkFields) {
+ log.info("start to parse transform sql for transform list={}",
transformResponseList);
+ StringBuilder result = new StringBuilder()
+ .append(genSimpleSelectSql(sinkFields))
+ .append(getFilterSql(transformResponseList));
+ log.info("success to parse transform for transform list={},
result={}", transformResponseList, result);
+ return result.toString().replaceAll("`", "");
+ }
+
+ private static StringBuilder genSimpleSelectSql(List<SinkField> fields) {
+ StringBuilder sb = new StringBuilder("SELECT ");
+ for (SinkField field : fields) {
+ sb.append("`").append(field.getSourceFieldName()).append("`,");
+ }
+ sb.deleteCharAt(sb.length() - 1).append(" FROM SOURCE");
+ return sb;
+ }
+
+ private static StringBuilder getFilterSql(List<TransformResponse>
filterList) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (TransformResponse transformResponse : filterList) {
+ TransformType transformType =
TransformType.forType(transformResponse.getTransformType());
+ TransformDefinition transformDefinition =
StreamParseUtils.parseTransformDefinition(
+ transformResponse.getTransformDefinition(), transformType);
+ String transformName = transformResponse.getTransformName();
+ switch (transformType) {
+ case FILTER:
+ stringBuilder.append(" WHERE");
+ FilterDefinition filterDefinition = (FilterDefinition)
transformDefinition;
+ List<FilterFunction> filterFunctions =
+
FilterFunctionUtils.createFilterFunctions(filterDefinition, transformName);
+ for (FilterFunction filterFunction : filterFunctions) {
+
stringBuilder.append(InlongConstants.BLANK).append(filterFunction.format());
+ }
+ break;
+ case SPLITTER:
+ case STRING_REPLACER:
+ case ENCRYPT:
+ case DE_DUPLICATION:
+ case JOINER:
+ case LOOKUP_JOINER:
+ case TEMPORAL_JOINER:
+ case INTERVAL_JOINER:
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported transformType=%s",
transformType));
+ }
+ }
+ return stringBuilder;
+ }
+
+}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
index dc7ec96b4e..6b9533e963 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
@@ -104,4 +104,11 @@ public class StreamTransformController {
return
Response.success(transformFunctionDocService.listByCondition(request));
}
+ @RequestMapping(value = "/transform/parseTransformSql", method =
RequestMethod.POST)
+ @ApiOperation(value = "Parse stream transform sql")
+ public Response<String> parseTransformSql(@Validated @RequestBody
TransformRequest request) {
+ return Response.success(
+ streamTransformService.parseTransformSql(request,
LoginUserUtils.getLoginUser().getName()));
+ }
+
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
index ce22dc2010..2962452190 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
@@ -105,4 +105,11 @@ public class OpenStreamTransformController {
@Validated @RequestBody TransformFunctionDocRequest request) {
return
Response.success(transformFunctionDocService.listByCondition(request));
}
+
+ @RequestMapping(value = "/transform/parseTransformSql", method =
RequestMethod.POST)
+ @ApiOperation(value = "Parse stream transform sql")
+ public Response<String> parseTransformSql(@Validated @RequestBody
TransformRequest request) {
+ return Response.success(
+ streamTransformService.parseTransformSql(request,
LoginUserUtils.getLoginUser().getName()));
+ }
}