你貌似使用的是flink-1.11的语法。
可以修改成flink-1.10的语法试试,参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年09月28日 09:16,hl9...@126.com<hl9...@126.com> 写道:
flink版本1.10.2,问题重现如下,请问各位大佬是什么原因:

./sql-client.sh  embedded
Flink SQL> show tables ;
[INFO] Result was empty.

Flink SQL> CREATE TABLE tx (
account_id  BIGINT,
amount      BIGINT,
transaction_time TIMESTAMP(3),
WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic'     = 'heli01',
'properties.bootstrap.servers' = '10.100.51.56:9092',
'format'    = 'csv'
);
[INFO] Table has been created.

Flink SQL> select * from tx ;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector=kafka
format=csv
properties.bootstrap.servers=10.100.51.56:9092
schema.0.data-type=BIGINT
schema.0.name=account_id
schema.1.data-type=BIGINT
schema.1.name=amount
schema.2.data-type=TIMESTAMP(3)
schema.2.name=transaction_time
schema.watermark.0.rowtime=transaction_time
schema.watermark.0.strategy.data-type=TIMESTAMP(3)
schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND
topic=heli01

The following factories have been considered:
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory



hl9...@126.com

回复