Yunhong Zheng created FLINK-32897:
-------------------------------------
Summary: Kafka with nested array row type will get wrong result
Key: FLINK-32897
URL: https://issues.apache.org/jira/browse/FLINK-32897
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.18.0
Reporter: Yunhong Zheng
Fix For: 1.19.0
Attachments: image-2023-08-21-08-48-25-116.png
{code:java}
// code placeholder
CREATE TABLE kafkaTest (
a STRING NOT NULL,
config ARRAY<ROW<notificationTypeOptInReference STRING NOT NULL,cso STRING
NOT NULL,autocreate BOOLEAN NOT NULL> NOT NULL> NOT NULL,
ingestionTime TIMESTAMP(3) METADATA FROM 'timestamp',
PRIMARY KEY (businessEvent) NOT ENFORCED)
WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = ';',
'value.format' = 'json',
'sink.partitioner' = 'fixed'
); {code}
If we with the following INSERT, it will see that the last item in the array is
placed in the topic 3 times and the first two are igniored.
{code:java}
// code placeholder
INSERT INTO kafkaTest VALUES ('Transaction', ARRAY[ROW('G', 'IT',
true),ROW('H', 'FR', true), ROW('I', 'IT', false)], TIMESTAMP '2023-08-30
14:01:00'); {code}
The result:
!image-2023-08-21-08-48-25-116.png|width=296,height=175!
If I use the 'print' sink, I can get the right result. So I think this is a bug
of 'kafka' connector.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)