David Perkins created FLINK-35726:
-------------------------------------
Summary: Data Stream to Table API converts Map to RAW
'java.util.Map'
Key: FLINK-35726
URL: https://issues.apache.org/jira/browse/FLINK-35726
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.17.2
Reporter: David Perkins
We have a use case where we convert from the Table API to a Data Stream with a
class, perform some operations, and then convert back to the Table API. When
the data contains a Map, the conversion back to the Table API converts the Map
to {{{}RAW('java.util.Map', '...'){}}}. This causes an 'Incompatible types for
sink column' exception.
In this particular case, the Map contains the Kafka headers, which we need to
preserve and write to the output topic. Both topics/table definitions use the
same schema. We have set a {{DataTypeHint}} annotation on the Map field in the
Java class. We are currently working around this issue by using a UDF to simply
perform a type conversion from the RAW Java Map to the Table API Map.
One note is that if no operations are performed on the stream, it work's
correctly. But adding a simple identity map causes the exception.
Here's a simple example to reproduce the problem.
CREATE TABLE Source (
id STRING,
headers MAP<STRING, BYTES> METADATA
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = 'kafka-bootstrap-server',
'format' = 'json'
);
CREATE TABLE Target (
id STRING,
headers MAP<STRING, BYTES> METADATA
) WITH (
'connector' = 'kafka',
'topic' = 'target',
'properties.bootstrap.servers' = 'kafka-bootstrap-server',
'format' = 'json'
);
public class MyRecord {
private String id;
@DataTypeHint(value = "MAP<STRING, BYTES>")
private Map<String,byte[]> headers;
...
}
public class MyJob {
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(streamEnv); Table sourceTable =
tableEnv.from("Source"); var sourceStream = tableEnv.toDataStream(sourceTable,
MyRecord.class); var mappedStream = sourceStream.map(row -> row); Table
outputTable = tableEnv.fromDataStream(mappedStream);
tableEnv.createStatementSet().add(outputTable.insertInto("Target"))
.attachAsDataStream(); streamEnv.executeAsync("Table Datastream test");
}
}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)