[
https://issues.apache.org/jira/browse/FLINK-28672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
songwenbin updated FLINK-28672:
---
Description:
# 配置了'properties.allow.auto.create.topics' =
'false',但job执行后都会创建出mocktest27的topic
CREATE TABLE interval_source7 (
item STRING,
val DOUBLE,
is_energy BIGINT,
ts BIGINT,
event_time AS TO_TIMESTAMP_LTZ(ts, 3),
process_time AS PROCTIME(),
WATERMARK FOR event_time AS event_time)
WITH ('connector' = 'kafka', 'topic' = 'daily-consumption',
'properties.bootstrap.servers' = 'broker:9092','properties.group.id' =
'l3_daily-consumption_mocktest','value.format' =
'avro-confluent','value.avro-confluent.schema-registry.url' =
'http://schema-registry:8081','value.fields-include' = 'EXCEPT_KEY',
'scan.startup.mode' = 'earliest-offset');
CREATE TABLE interval_consumption_sink7 (
item STRING,
val DOUBLE,
ts BIGINT,
isEnergy BIGINT,
nodeId INTEGER,
siteId INTEGER,
shiftId INTEGER,
shiftTime BIGINT,
PRIMARY KEY (item, ts) NOT ENFORCED)
WITH ('connector' = 'upsert-kafka','topic' =
'mocktest27','properties.allow.auto.create.topics' = 'false'
,'properties.bootstrap.servers' = 'broker:9092', 'key.format' =
'avro-confluent', 'key.avro-confluent.schema-registry.url' =
'http://schema-registry:8081','value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://schema-registry:8081');
INSERT INTO interval_consumption_sink7
SELECT
item,
val,
ts,
1 AS is_energy,
2,
3,
4,
1
FROM interval_source7 ;
was:
配置了'properties.allow.auto.create.topics' = 'false',但job执行后都会创建出mocktest27的topic
CREATE TABLE interval_source7 (
item STRING,
val DOUBLE,
is_energy BIGINT,
ts BIGINT,
event_time AS TO_TIMESTAMP_LTZ(ts, 3),
process_time AS PROCTIME(),
WATERMARK FOR event_time AS event_time)
WITH ('connector' = 'kafka', 'topic' = 'daily-consumption',
'properties.bootstrap.servers' = 'broker:9092','properties.group.id' =
'l3_daily-consumption_mocktest','value.format' =
'avro-confluent','value.avro-confluent.schema-registry.url' =
'http://schema-registry:8081','value.fields-include' = 'EXCEPT_KEY',
'scan.startup.mode' = 'earliest-offset');
CREATE TABLE interval_consumption_sink7 (
item STRING,
val DOUBLE,
ts BIGINT,
isEnergy BIGINT,
nodeId INTEGER,
siteId INTEGER,
shiftId INTEGER,
shiftTime BIGINT,
PRIMARY KEY (item, ts) NOT ENFORCED)
WITH ('connector' = 'upsert-kafka','topic' =
'mocktest27','properties.allow.auto.create.topics' = 'false'
,'properties.bootstrap.servers' = 'broker:9092', 'key.format' =
'avro-confluent', 'key.avro-confluent.schema-registry.url' =
'http://schema-registry:8081','value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://schema-registry:8081');
INSERT INTO interval_consumption_sink7
SELECT
item,
val,
ts,
1 AS is_energy,
2,
3,
4,
1
FROM interval_source7 ;
> properties.allow.auto.create.topics disable
> ---
>
> Key: FLINK-28672
> URL: https://issues.apache.org/jira/browse/FLINK-28672
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
>Affects Versions: 1.13.6
>Reporter: songwenbin
>Priority: Major
>
> # 配置了'properties.allow.auto.create.topics' =
> 'false',但job执行后都会创建出mocktest27的topic
> CREATE TABLE interval_source7 (
> item STRING,
> val DOUBLE,
> is_energy BIGINT,
> ts BIGINT,
> event_time AS TO_TIMESTAMP_LTZ(ts, 3),
> process_time AS PROCTIME(),
> WATERMARK FOR event_time AS event_time)
> WITH ('connector' = 'kafka', 'topic' = 'daily-consumption',
> 'properties.bootstrap.servers' = 'broker:9092','properties.group.id' =
> 'l3_daily-consumption_mocktest','value.format' =
> 'avro-confluent','value.avro-confluent.schema-registry.url' =
> 'http://schema-registry:8081','value.fields-include' = 'EXCEPT_KEY',
> 'scan.startup.mode' = 'earliest-offset');
> CREATE TABLE interval_consumption_sink7 (
> item STRING,
> val DOUBLE,
> ts BIGINT,
> isEnergy BIGINT,
> nodeId INTEGER,
> siteId INTEGER,
> shiftId INTEGER,
> shiftTime BIGINT,
> PRIMARY KEY (item, ts) NOT ENFORCED)
> WITH ('connector' = 'upsert-kafka','topic' =
> 'mocktest27','properties.allow.auto.create.topics' = 'false'