luchunliang opened a new issue, #10023:
URL: https://github.com/apache/inlong/issues/10023

   ### Description
   
   ## Motivation
    - Agent (File collection) needs the ability to filter and collect valid 
data content.
    - Agent (Pulsar collection) requires PB protocol parsing and data 
extraction capabilities.
   
   ## Solution
    - Transform is integrated as an SDK by Agent; Manager will also integrate 
Transform to provide pre-transformation validation when users configure 
transformation SQL.
    - Before performing transformation processing, the Agent needs to register 
the transformation configuration pulled from Manager to Transform. When the 
transformation configuration changes, it needs to re-register the configuration 
to Transform based on Key: StreamSourceId.
   - Agent-Sink passes StreamSourceId and RawData into Transform, and Transform 
returns zero or more FormalData. Agent-Sink sends the final FormalData to 
DataProxy.
    - For Transform's registered configurations, there is one set of 
configurations per StreamSourceId, and one StreamSourceId belongs to one 
GroupId and StreamId's InLong data stream.
   - Transform's transformation configuration includes three parts: 
transformation Source, transformation SQL, and transformation Sink.
    - Transformation SQL first provides basic field filtering and field 
cropping. Other date and time conversion functions and string conversion 
functions will be supplemented later based on Flink's built-in functions.
   
![image](https://github.com/apache/inlong/assets/8925507/32800fbb-bb90-4c6c-853d-f9f511b6815b)
   
   ## Configuration Model
   
![image](https://github.com/apache/inlong/assets/8925507/40edb2ef-633b-481b-b22c-a5f4b99805a9)
   
   ## Interface API of Transform SDK
   - TransformConfig register(String streamSourceId, TransformConfig config) 
throws TransformException;
    - If the transformation SQL compilation fails, an exception is returned.
    - Check the legality and non-null of the configuration, and return an 
exception if it fails.
    - ProtoDefine automatically generates code and compiles it using a dynamic 
classloader; if it fails, an exception is returned.
    - If the configuration validation is successful, the previous 
TransformConfig is returned; if it is a new configuration, null is returned.
   - TransformConfig unregister(String streamSourceId);
    - Unregister the configuration, return the previous TransformConfig; if it 
does not exist, return null.
   - List<byte[]> transform(String streamSourceId, byte[] rawdata) throws 
TransformException;
    - Synchronous interface
    - Thread-safe
    - Processing logic:
     - Based on the SourceInfo configuration, parse rawdata, and generate 
Map<String, byte[]> or ProtoObject/JsonObject.
     - Interpret the syntax tree generated by the transformation SQL and 
generate the result set List<Row>.
     - Based on the SinkInfo configuration, convert the result set List<Row> 
into List<byte[]> and return it.
   
   ### Use case
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes, I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to