This is an automated email from the ASF dual-hosted git repository.
wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b729a9d1c6 [INLONG-11877][Manager] Support verifying transform SQL
(#11878)
b729a9d1c6 is described below
commit b729a9d1c6699d7748fdf9a8cde713410ffc6400
Author: fuweng11 <[email protected]>
AuthorDate: Thu Jun 5 15:09:43 2025 +0800
[INLONG-11877][Manager] Support verifying transform SQL (#11878)
* [INLONG-11877][Manager] Support verifying transform SQL
---
.../client/api/inner/client/StreamSinkClient.java | 15 ++++++
.../manager/client/api/service/StreamSinkApi.java | 4 ++
.../manager/pojo/sink/TransformParseRequest.java | 63 ++++++++++++++++++++++
.../service/datatype/CsvDataTypeOperator.java | 55 +++++++++++++++++++
.../manager/service/datatype/DataTypeOperator.java | 8 +++
.../service/datatype/KvDataTypeOperator.java | 61 +++++++++++++++++++++
.../manager/service/sink/StreamSinkService.java | 10 ++++
.../service/sink/StreamSinkServiceImpl.java | 37 +++++++++++++
.../web/controller/StreamSinkController.java | 6 +++
.../openapi/OpenStreamSinkController.java | 8 +++
10 files changed, 267 insertions(+)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
index c350900f06..202b5c24a9 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
@@ -30,10 +30,12 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
+import java.util.Map;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
@@ -171,4 +173,17 @@ public class StreamSinkClient {
ParseFieldRequest request =
ParseFieldRequest.builder().method(method).statement(statement).build();
return parseFields(request);
}
+
+ /**
+ * Parse transform sql for data
+ *
+ * @param transformParseRequest the request for parse transform
+ * @return result of parse result
+ */
+ public Map<String, Object> parseTransform(TransformParseRequest
transformParseRequest) {
+ Response<Map<String, Object>> response =
+
ClientUtils.executeHttpCall(streamSinkApi.parseTransform(transformParseRequest));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
index 75088a5eee..168ef85d0f 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
import retrofit2.Call;
import retrofit2.http.Body;
@@ -36,6 +37,7 @@ import retrofit2.http.Path;
import retrofit2.http.Query;
import java.util.List;
+import java.util.Map;
public interface StreamSinkApi {
@@ -67,4 +69,6 @@ public interface StreamSinkApi {
@POST("sink/parseFields")
Call<Response<List<SinkField>>> parseFields(@Body ParseFieldRequest
parseFieldRequest);
+ @POST("sink/parseTransform")
+ Call<Response<Map<String, Object>>> parseTransform(@Body
TransformParseRequest request);
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/TransformParseRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/TransformParseRequest.java
new file mode 100644
index 0000000000..94d9d79615
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/TransformParseRequest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink;
+
+import org.apache.inlong.manager.common.validation.SaveValidation;
+import org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.hibernate.validator.constraints.Length;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.List;
+
+/**
+ * Request for Dirty data
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@ApiModel("Transform parse request")
+public class TransformParseRequest {
+
+ @ApiModelProperty("Inlong group id")
+ @NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class},
message = "inlongGroupId cannot be blank")
+ @Length(min = 4, max = 256, message = "length must be between 4 and 200")
+ @Pattern(regexp = "^[a-zA-Z0-9_.-]{4,200}$", message = "only supports
letters, numbers, '.', '-', or '_'")
+ private String inlongGroupId;
+
+ @ApiModelProperty("Inlong stream id")
+ @NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class},
message = "inlongStreamId cannot be blank")
+ @Length(min = 1, max = 256, message = "inlongStreamId length must be
between 1 and 200")
+ @Pattern(regexp = "^[a-zA-Z0-9_.-]{1,200}$", message = "inlongStreamId
only supports letters, numbers, '.', '-', or '_'")
+ private String inlongStreamId;
+
+ @ApiModelProperty("Transform sql")
+ private String transformSql;
+
+ @ApiModelProperty("Data")
+ private String data;
+
+ @ApiModelProperty("Sink field list")
+ private List<SinkField> sinkFieldList;
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
index 4f12339014..f0aaa79713 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
@@ -20,16 +20,28 @@ package org.apache.inlong.manager.service.datatype;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
import org.apache.inlong.sdk.transform.decode.SplitUtils;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
@Slf4j
@Service
@@ -84,4 +96,47 @@ public class CsvDataTypeOperator implements DataTypeOperator
{
csvConfig.setEscapeChar(escape);
return csvConfig;
}
+
+ @Override
+ public Map<String, Object> parseTransform(InlongStreamInfo streamInfo,
List<SinkField> fieldList,
+ String transformSql,
+ String data) {
+ try {
+ List<org.apache.inlong.sdk.transform.pojo.FieldInfo> srcFields =
new ArrayList<>();
+ List<org.apache.inlong.sdk.transform.pojo.FieldInfo> dstFields =
new ArrayList<>();
+ for (StreamField streamField : streamInfo.getFieldList()) {
+ if (StringUtils.isNotBlank(streamField.getFieldName())) {
+ srcFields.add(
+ new
org.apache.inlong.sdk.transform.pojo.FieldInfo(streamField.getFieldName(),
+ TypeConverter.DefaultTypeConverter()));
+ }
+ }
+ for (SinkField sinkField : fieldList) {
+ String targetFieldName = sinkField.getFieldName();
+ if (StringUtils.isNotBlank(targetFieldName)) {
+ dstFields.add(new
org.apache.inlong.sdk.transform.pojo.FieldInfo(targetFieldName));
+ }
+ }
+ char separator = '&';
+ if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
+ separator = (char)
Integer.parseInt(streamInfo.getDataSeparator());
+ }
+ Character escape = null;
+ if (StringUtils.isNotBlank(streamInfo.getDataEscapeChar())) {
+ escape = streamInfo.getDataEscapeChar().charAt(0);
+ }
+ CsvSourceInfo csvSource = new
CsvSourceInfo(streamInfo.getDataEncoding(), separator, escape, srcFields);
+ MapSinkInfo mapSinkInfo = new
MapSinkInfo(streamInfo.getDataEncoding(), dstFields);
+ TransformConfig config = new TransformConfig(transformSql);
+ TransformProcessor<String, Map<String, Object>> processor =
TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createMapEncoder(mapSinkInfo));
+ List<Map<String, Object>> result = processor.transform(data);
+ log.info("success parse transform sql result={}", result);
+ return result.get(0);
+ } catch (Exception e) {
+ log.error("parse transform sql failed", e);
+ throw new BusinessException("parse transform sql failed" +
e.getMessage());
+ }
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
index ad1a577473..7919a56a6d 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
@@ -22,9 +22,11 @@ import
org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import java.util.List;
+import java.util.Map;
/**
* Data type operator
@@ -51,4 +53,10 @@ public interface DataTypeOperator {
String.format("current type is not support for data type=%s",
streamInfo.getDataType()));
}
+ default Map<String, Object> parseTransform(InlongStreamInfo streamInfo,
List<SinkField> fieldList,
+ String transformSql, String data) {
+ throw new BusinessException(
+ String.format("current type is not support for data type=%s",
streamInfo.getDataType()));
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
index 900732f906..a2957baace 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
@@ -20,15 +20,26 @@ package org.apache.inlong.manager.service.datatype;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.sdk.transform.decode.KvUtils;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -101,4 +112,54 @@ public class KvDataTypeOperator implements
DataTypeOperator {
kvConfig.setEscapeChar(escape);
return kvConfig;
}
+
+ @Override
+ public Map<String, Object> parseTransform(InlongStreamInfo streamInfo,
List<SinkField> fieldList,
+ String transformSql,
+ String data) {
+ try {
+ List<org.apache.inlong.sdk.transform.pojo.FieldInfo> srcFields =
new ArrayList<>();
+ List<org.apache.inlong.sdk.transform.pojo.FieldInfo> dstFields =
new ArrayList<>();
+ for (StreamField streamField : streamInfo.getFieldList()) {
+ if (StringUtils.isNotBlank(streamField.getFieldName())) {
+ srcFields.add(
+ new
org.apache.inlong.sdk.transform.pojo.FieldInfo(streamField.getFieldName(),
+ TypeConverter.DefaultTypeConverter()));
+ }
+ }
+ for (SinkField sinkField : fieldList) {
+ String targetFieldName = sinkField.getFieldName();
+ if (StringUtils.isNotBlank(targetFieldName)) {
+ dstFields.add(new
org.apache.inlong.sdk.transform.pojo.FieldInfo(targetFieldName));
+ }
+ }
+ char separator = '&';
+ if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
+ separator = (char)
Integer.parseInt(streamInfo.getDataSeparator());
+ }
+ Character escape = null;
+ if (StringUtils.isNotBlank(streamInfo.getDataEscapeChar())) {
+ escape = streamInfo.getDataEscapeChar().charAt(0);
+ }
+ char kvSeparator = '=';
+ if (StringUtils.isNotBlank(streamInfo.getKvSeparator())) {
+ kvSeparator = (char)
Integer.parseInt(streamInfo.getKvSeparator());
+ }
+ KvSourceInfo kvSourceInfo = new
KvSourceInfo(streamInfo.getDataEncoding(), srcFields);
+ kvSourceInfo.setEscapeChar(escape);
+ kvSourceInfo.setEntryDelimiter(separator);
+ kvSourceInfo.setKvDelimiter(kvSeparator);
+ MapSinkInfo mapSinkInfo = new
MapSinkInfo(streamInfo.getDataEncoding(), dstFields);
+ TransformConfig config = new TransformConfig(transformSql);
+ TransformProcessor<String, Map<String, Object>> processor =
TransformProcessor
+ .create(config,
SourceDecoderFactory.createKvDecoder(kvSourceInfo),
+ SinkEncoderFactory.createMapEncoder(mapSinkInfo));
+ List<Map<String, Object>> result = processor.transform(data);
+ log.info("success parse transform sql result={}", result);
+ return result.get(0);
+ } catch (Exception e) {
+ log.error("parse transform sql failed", e);
+ throw new BusinessException("parse transform sql failed" +
e.getMessage());
+ }
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index a0b6a29443..bd6d915227 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.UserInfo;
@@ -247,4 +248,13 @@ public interface StreamSinkService {
* @return list of sink field
*/
List<SinkField> parseFields(ParseFieldRequest parseFieldRequest);
+
+ /**
+ * Parse transform sql for data
+ *
+ * @param request the request for parse transform
+ * @return result of parse result
+ */
+ Map<String, Object> parseTransform(TransformParseRequest request);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 431d44b3d0..e306c1f6f1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sink;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.OperationTarget;
@@ -32,12 +33,14 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
import org.apache.inlong.manager.dao.entity.SortConfigEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
@@ -59,8 +62,12 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.user.UserInfo;
+import org.apache.inlong.manager.service.datatype.DataTypeOperator;
+import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
import org.apache.inlong.manager.service.user.UserService;
@@ -106,6 +113,7 @@ import static
org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NO
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+import static
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
import static
org.apache.inlong.manager.service.resource.queue.pulsar.PulsarQueueResourceOperator.PULSAR_SUBSCRIPTION;
import static
org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQQueueResourceOperator.TUBE_CONSUMER_GROUP;
@@ -130,12 +138,16 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
@Autowired
private InlongStreamEntityMapper streamMapper;
@Autowired
+ private InlongStreamFieldEntityMapper streamFieldMapper;
+ @Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
private StreamSinkEntityMapper sinkMapper;
@Autowired
private StreamSinkFieldEntityMapper sinkFieldMapper;
@Autowired
+ public DataTypeOperatorFactory dataTypeOperatorFactory;
+ @Autowired
private AutowireCapableBeanFactory autowireCapableBeanFactory;
@Autowired
private ObjectMapper objectMapper;
@@ -746,6 +758,31 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
}
}
+ @Override
+ public Map<String, Object> parseTransform(TransformParseRequest request) {
+ LOGGER.info("begin to parse transform for data: {}", request);
+ // Check whether the stream exist or not
+ InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
+ request.getInlongGroupId(), request.getInlongStreamId());
+ Preconditions.expectNotNull(streamEntity,
ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
+
Preconditions.expectTrue(CollectionUtils.isNotEmpty(request.getSinkFieldList()),
+ ErrorCodeEnum.SINK_FIELD_LIST_IS_EMPTY.getMessage());
+ Preconditions.expectNotBlank(request.getTransformSql(), "Transform sql
is empty");
+ DataTypeOperator dataTypeOperator =
+
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamEntity.getDataType()));
+ InlongStreamInfo streamInfo =
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
+ unpackExtParams(streamEntity.getExtParams(), streamInfo);
+ List<InlongStreamFieldEntity> fieldEntityList =
+
streamFieldMapper.selectByIdentifier(request.getInlongGroupId(),
request.getInlongStreamId());
+ List<StreamField> streamFields =
CommonBeanUtils.copyListProperties(fieldEntityList, StreamField::new);
+ streamInfo.setFieldList(streamFields);
+ Map<String, Object> result =
+ dataTypeOperator.parseTransform(streamInfo,
request.getSinkFieldList(), request.getTransformSql(),
+ request.getData());
+ LOGGER.info("success to parse transform for data: {}, result={}",
request, result);
+ return result;
+ }
+
private List<SinkField> parseFieldsByCsv(String statement) {
String[] lines = statement.split(InlongConstants.NEW_LINE);
List<SinkField> fields = new ArrayList<>();
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 331e3a5355..588dfec8fc 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.service.dirtyData.DirtyQueryLogService;
import org.apache.inlong.manager.service.operationlog.OperationLog;
@@ -181,4 +182,9 @@ public class StreamSinkController {
return Response.success(dirtyQueryLogService.getSqlTaskStatus(taskId));
}
+ @RequestMapping(value = "/sink/parseTransform", method =
RequestMethod.POST)
+ @ApiOperation(value = "parse transform sql from data")
+ public Response<Map<String, Object>> parseTransform(@RequestBody
TransformParseRequest request) {
+ return Response.success(sinkService.parseTransform(request));
+ }
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
index e999e1ba28..1c313f77e3 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.service.operationlog.OperationLog;
import org.apache.inlong.manager.service.sink.StreamSinkService;
@@ -45,6 +46,7 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
+import java.util.Map;
/**
* Open InLong Stream Sink controller
@@ -113,4 +115,10 @@ public class OpenStreamSinkController {
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(sinkService.delete(id, startProcess,
LoginUserUtils.getLoginUser().getName()));
}
+
+ @RequestMapping(value = "/sink/parseTransform", method =
RequestMethod.POST)
+ @ApiOperation(value = "parse transform sql from data")
+ public Response<Map<String, Object>> parseTransform(@RequestBody
TransformParseRequest request) {
+ return Response.success(sinkService.parseTransform(request));
+ }
}