[
https://issues.apache.org/jira/browse/FLINK-38122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ramin Gharib updated FLINK-38122:
---------------------------------
Description:
When using the OBJECT_OF function or CAST with STRUCTURED types in Flink SQL,
if the target Java class does not have a fully defining constructor, the field
ordering is incorrectly determined, leading to corrupted data reads from the
wrong memory segments.
h4. Root Cause:
Flink's structured type extraction logic (in
DataTypeExtractor.createStructuredTypeAttributes()) falls back to alphabetical
field ordering when no constructor is present, rather than preserving the order
of field declarations. This causes a mismatch between the expected field layout
and the actual memory layout when creating structured objects.
h4. Current Behavior:
{code:java}
// Without constructor - fields are ordered alphabetically: [age, name] public
static class User {
public String name; // Declared first
public int age; // Declared second
} {code}
When using:
{code:java}
SELECT OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30) {code}
The system creates a structured type with field order [age, name]
(alphabetical), but assigns values in the order [name, age] (as specified),
resulting in memory corruption. It outputs:
{code:java}
User info: +I[User{name=' Alice � ',
age=1667853377}]{code}
h4. Workaround:
Add a fully defining constructor that explicitly defines field order:
{code:java}
public static class User {
public String name;
public int age;
// Fully defining constructor enforces field order
public User(String name, int age) {
this.name = name;
this.age = age;
}
} {code}
h4. Analysis:
The issue is in
[flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
at lines
655-668|https://github.com/raminqaf/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java#L669-L669]:
{code:java}
private DataTypes.Field[] createStructuredTypeAttributes(
ExtractionUtils.AssigningConstructor constructor,
Map<String, DataType> fieldDataTypes) {
return Optional.ofNullable(constructor)
.map(c -> c.parameterNames.stream()) // Uses constructor order
.orElseGet(() -> fieldDataTypes.keySet().stream().sorted()) // ←
PROBLEM: Alphabetical sort
.map(name -> DataTypes.FIELD(name, fieldDataTypes.get(name)))
.toArray(DataTypes.Field[]::new);
} {code}
was:
When using the OBJECT_OF function or CAST with STRUCTURED types in Flink SQL,
if the target Java class does not have a fully defining constructor, the field
ordering is incorrectly determined, leading to corrupted data reads from the
wrong memory segments.
h4. Root Cause:
Flink's structured type extraction logic (in
DataTypeExtractor.createStructuredTypeAttributes()) falls back to alphabetical
field ordering when no constructor is present, rather than preserving the order
of field declarations. This causes a mismatch between the expected field layout
and the actual memory layout when creating structured objects.
h4. Current Behavior:
{code:java}
// Without constructor - fields are ordered alphabetically: [age, name] public
static class User {
public String name; // Declared first
public int age; // Declared second
} {code}
When using:
{code:java}
SELECT OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30) {code}
The system creates a structured type with field order [age, name]
(alphabetical), but assigns values in the order [name, age] (as specified),
resulting in memory corruption. It outputs:
{code:java}
User info: +I[User{name=' Alice � ',
age=1667853377}]{code}
h4. Workaround:
Add a fully defining constructor that explicitly defines field order:
{code:java}
public static class User {
public String name;
public int age;
// Fully defining constructor enforces field order
public User(String name, int age) {
this.name = name;
this.age = age;
}
} {code}
h4. Analysis:
The issue is in
[flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
at lines
655-668|https://github.com/raminqaf/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java#L669-L669]:
{code:java}
private DataTypes.Field[] createStructuredTypeAttributes(
ExtractionUtils.AssigningConstructor constructor,
Map<String, DataType> fieldDataTypes) {
return Optional.ofNullable(constructor)
.map(c -> c.parameterNames.stream()) // Uses constructor order
.orElseGet(() -> fieldDataTypes.keySet().stream().sorted()) // ←
PROBLEM: Alphabetical sort
.map(name -> DataTypes.FIELD(name, fieldDataTypes.get(name)))
.toArray(DataTypes.Field[]::new);
} {code}
> Creating structured types with no fully defining constructor results in wrong
> results
> -------------------------------------------------------------------------------------
>
> Key: FLINK-38122
> URL: https://issues.apache.org/jira/browse/FLINK-38122
> Project: Flink
> Issue Type: Sub-task
> Reporter: Ramin Gharib
> Priority: Major
>
> When using the OBJECT_OF function or CAST with STRUCTURED types in Flink SQL,
> if the target Java class does not have a fully defining constructor, the
> field ordering is incorrectly determined, leading to corrupted data reads
> from the wrong memory segments.
> h4. Root Cause:
> Flink's structured type extraction logic (in
> DataTypeExtractor.createStructuredTypeAttributes()) falls back to
> alphabetical field ordering when no constructor is present, rather than
> preserving the order of field declarations. This causes a mismatch between
> the expected field layout and the actual memory layout when creating
> structured objects.
> h4. Current Behavior:
> {code:java}
> // Without constructor - fields are ordered alphabetically: [age, name]
> public static class User {
> public String name; // Declared first
> public int age; // Declared second
> } {code}
> When using:
> {code:java}
> SELECT OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30) {code}
> The system creates a structured type with field order [age, name]
> (alphabetical), but assigns values in the order [name, age] (as specified),
> resulting in memory corruption. It outputs:
> {code:java}
> User info: +I[User{name=' Alice � ',
> age=1667853377}]{code}
> h4. Workaround:
> Add a fully defining constructor that explicitly defines field order:
> {code:java}
> public static class User {
> public String name;
> public int age;
>
> // Fully defining constructor enforces field order
> public User(String name, int age) {
> this.name = name;
> this.age = age;
> }
> } {code}
> h4. Analysis:
> The issue is in
> [flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
> at lines
> 655-668|https://github.com/raminqaf/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java#L669-L669]:
> {code:java}
> private DataTypes.Field[] createStructuredTypeAttributes(
> ExtractionUtils.AssigningConstructor constructor,
> Map<String, DataType> fieldDataTypes) {
> return Optional.ofNullable(constructor)
> .map(c -> c.parameterNames.stream()) // Uses constructor order
> .orElseGet(() -> fieldDataTypes.keySet().stream().sorted()) // ←
> PROBLEM: Alphabetical sort
> .map(name -> DataTypes.FIELD(name, fieldDataTypes.get(name)))
> .toArray(DataTypes.Field[]::new);
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)