David Perkins created FLINK-35726:
-------------------------------------

             Summary: Data Stream to Table API converts Map to RAW 
'java.util.Map'
                 Key: FLINK-35726
                 URL: https://issues.apache.org/jira/browse/FLINK-35726
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.17.2
            Reporter: David Perkins


We have a use case where we convert from the Table API to a Data Stream with a 
class, perform some operations, and then convert back to the Table API. When 
the data contains a Map, the conversion back to the Table API converts the Map 
to {{{}RAW('java.util.Map', '...'){}}}. This causes an 'Incompatible types for 
sink column' exception.

In this particular case, the Map contains the Kafka headers, which we need to 
preserve and write to the output topic. Both topics/table definitions use the 
same schema. We have set a {{DataTypeHint}} annotation on the Map field in the 
Java class. We are currently working around this issue by using a UDF to simply 
perform a type conversion from the RAW Java Map to the Table API Map.

One note is that if no operations are performed on the stream, it work's 
correctly. But adding a simple identity map causes the exception.

Here's a simple example to reproduce the problem.

CREATE TABLE Source (
id STRING,
headers MAP<STRING, BYTES> METADATA
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = 'kafka-bootstrap-server',
'format' = 'json'
);

CREATE TABLE Target (
id STRING,
headers MAP<STRING, BYTES> METADATA
) WITH (
'connector' = 'kafka',
'topic' = 'target',
'properties.bootstrap.servers' = 'kafka-bootstrap-server',
'format' = 'json'
);

public class MyRecord {
private String id;
@DataTypeHint(value = "MAP<STRING, BYTES>")
private Map<String,byte[]> headers;
...
}

public class MyJob {
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv); Table sourceTable = 
tableEnv.from("Source"); var sourceStream = tableEnv.toDataStream(sourceTable, 
MyRecord.class); var mappedStream = sourceStream.map(row -> row); Table 
outputTable = tableEnv.fromDataStream(mappedStream); 
tableEnv.createStatementSet().add(outputTable.insertInto("Target")) 
.attachAsDataStream(); streamEnv.executeAsync("Table Datastream test"); 
}
}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to