Hi Flink team,
I am trying to use temporal join with two kafka-based streams ("tables"),
where the key is formatted as JSON and value - confluent schema
registry-based Avro.
On joining I get the error: "*The Kafka table
'default_catalog.default_database.sensor_readings' with 'avro-confluent'
format doesn't support defining PRIMARY KEY constraint on the table,
because it can't guarantee the semantic of primary key*."
It's strange because the primary key is taken not from the value that is
avro-confluent-based, but from the message key (JSON format).
The DDLs are:
```
CREATE TABLE sensor_readings (
kafka_key_device_id VARCHAR primary key NOT ENFORCED,
co DOUBLE,
humidity DOUBLE,
motion BOOLEAN,
temp DOUBLE,
ampere_hour DOUBLE,
ts TIMESTAMP(3),
proctime AS PROCTIME(),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'sensor.readings.json_key3.sr',
'properties.bootstrap.servers' = '{cc_config["bootstrap.servers"]}',
'properties.group.id' = 'device.tumbling.w.sr.sql',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'key.format' = 'json',
'key.fields' = 'kafka_key_device_id',
'key.fields-prefix' = 'kafka_key_',
'value.fields-include' = 'EXCEPT_KEY',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '{sr_config["url"]}'
)
CREATE TABLE device_account_stats (
kafka_key_device_id VARCHAR primary key NOT ENFORCED,
metric_1 DOUBLE,
metric_2 INTEGER,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'device_account.metrics.json_key3.sr',
'properties.bootstrap.servers' = '{cc_config["bootstrap.servers"]}',
'properties.group.id' = 'device.tumbling.w.sr.sql',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'key.format' = 'json',
'key.fields' = 'kafka_key_device_id',
'key.fields-prefix' = 'kafka_key_',
'value.fields-include' = 'EXCEPT_KEY',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '{sr_config["url"]}'
)
```
JOIN
```
tumbling_w_sql = """
SELECT
sr.kafka_key_device_id as device_id,
das.metric_1,
das.metric_2,
TUMBLE_START(sr.proctime, INTERVAL '30' SECONDS) AS window_start,
TUMBLE_END(sr.proctime, INTERVAL '30' SECONDS) AS window_end,
SUM(sr.ampere_hour) AS charge_consumed
FROM sensor_readings sr
JOIN device_account_stats FOR SYSTEM_TIME AS OF sr.proctime AS
das ON sr.kafka_key_device_id = das.kafka_key_device_id
GROUP BY
TUMBLE(sr.proctime, INTERVAL '30' SECONDS),
sr.kafka_key_device_id,
das.metric_1,
das.metric_2
"""
```
In case I remove primary keys, the error is "*Temporal Table Join requires
primary key in versioned table, but no primary key can be found*".
Is it possible to make temporal JOIN when a value had AVRO format?
Regards,
Olga