luchunliang opened a new pull request, #12126:
URL: https://github.com/apache/inlong/pull/12126

   Fixes #12125 
   
   ### Motivation
   
   This PR introduces several enhancements to the Transform SDK for protobuf 
data processing and SQL alias parsing:
   
   1. Improve PbSourceData.buildFieldValue() with Flink-compatible type 
conversion
   Enhance buildFieldValue() to produce values directly compatible with Flink's 
RowData type system:
   
   STRING → wrap with BinaryStringData; handle existing BinaryStringData 
instances
   INT/LONG/FLOAT/DOUBLE/BOOLEAN → validate actual types and convert from 
String if necessary using NumberUtils
   BYTE_STRING → handle both ByteString and byte[] input; fallback to 
ISO_8859_1 encoding for other types
   MESSAGE → delegate to buildStructData()
   Other types → return as-is
   2. Improve PbSourceData node value caching with IdentityHashMap
   Replace HashMap with IdentityHashMap for nodeValueCache and mapNodeCache to 
use reference equality (==) instead of equals() when keying by DynamicMessage 
instances. This avoids expensive deep-equality comparisons and ensures correct 
identity-based lookups for protobuf messages.
   
   3. Fix null handling in PbSourceData.findFieldNode() and findChildField()
   Change return value from empty string "" to null when a field cannot be 
found, making the behavior consistent across the codebase and preventing 
downstream logic from treating a missing field as an empty string.
   
   4. Fix DefaultSinkData.getField() to return null for missing fields
   Change getOrDefault(fieldName, "") to get(fieldName) so that missing fields 
return null instead of "". This allows downstream encoders (e.g. 
RowDataSinkEncoder) to correctly distinguish between "field not set" and "field 
set to empty string".
   
   5. Add null filtering in array/map field processing
   In getField() array processing: skip null items via if (itemValue != null) 
before adding to result list
   In buildMapData(): skip map entries where key or value is null via if 
(keyValue != null && valueValue != null)
   6. Enhance findNodeValueByCache() with more precise cache-hit handling
   When a path-level cache hit is found at the last node position, apply the 
full node-type resolution logic (array index extraction, map key lookup) rather 
than returning the raw cached value directly. This ensures correct results for 
paths like field(0) or map_field(key) when the parent list/map was cached from 
a prior access.
   
   7. Fix SQL alias parsing for special characters (backtick handling)
   In TransformProcessor.initTransformSql(), strip surrounding backtick 
characters from alias names returned by exprItem.getAlias().getName(). 
JSqlParser's getName() preserves backtick quotes in the returned string, 
causing a mismatch with the sink field list. This fix allows aliases containing 
special characters (e.g., *, $) to be properly matched when wrapped in 
backticks in the SQL.
   ### Modifications
   
   Protobuf data → Flink RowData → Iceberg pipeline requires strict type 
compatibility (e.g., BinaryStringData for STRING fields)
   Identity-based caching avoids O(n) deep-equality for DynamicMessage keys
   Consistent null semantics prevent silent data corruption when fields are 
missing
   Aliases containing $ and * characters (common in protobuf map key paths) 
must be parseable in SQL
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
     *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to