luchunliang opened a new issue, #10023:
URL: https://github.com/apache/inlong/issues/10023
### Description
## Motivation
- Agent (File collection) needs the ability to filter and collect valid
data content.
- Agent (Pulsar collection) requires PB protocol parsing and data
extraction capabilities.
## Solution
- Transform is integrated as an SDK by Agent; Manager will also integrate
Transform to provide pre-transformation validation when users configure
transformation SQL.
- Before performing transformation processing, the Agent needs to register
the transformation configuration pulled from Manager to Transform. When the
transformation configuration changes, it needs to re-register the configuration
to Transform based on Key: StreamSourceId.
- Agent-Sink passes StreamSourceId and RawData into Transform, and Transform
returns zero or more FormalData. Agent-Sink sends the final FormalData to
DataProxy.
- For Transform's registered configurations, there is one set of
configurations per StreamSourceId, and one StreamSourceId belongs to one
GroupId and StreamId's InLong data stream.
- Transform's transformation configuration includes three parts:
transformation Source, transformation SQL, and transformation Sink.
- Transformation SQL first provides basic field filtering and field
cropping. Other date and time conversion functions and string conversion
functions will be supplemented later based on Flink's built-in functions.

## Configuration Model

## Interface API of Transform SDK
- TransformConfig register(String streamSourceId, TransformConfig config)
throws TransformException;
- If the transformation SQL compilation fails, an exception is returned.
- Check the legality and non-null of the configuration, and return an
exception if it fails.
- ProtoDefine automatically generates code and compiles it using a dynamic
classloader; if it fails, an exception is returned.
- If the configuration validation is successful, the previous
TransformConfig is returned; if it is a new configuration, null is returned.
- TransformConfig unregister(String streamSourceId);
- Unregister the configuration, return the previous TransformConfig; if it
does not exist, return null.
- List<byte[]> transform(String streamSourceId, byte[] rawdata) throws
TransformException;
- Synchronous interface
- Thread-safe
- Processing logic:
- Based on the SourceInfo configuration, parse rawdata, and generate
Map<String, byte[]> or ProtoObject/JsonObject.
- Interpret the syntax tree generated by the transformation SQL and
generate the result set List<Row>.
- Based on the SinkInfo configuration, convert the result set List<Row>
into List<byte[]> and return it.
### Use case
_No response_
### Are you willing to submit PR?
- [X] Yes, I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://www.apache.org/foundation/policies/conduct)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]