你貌似使用的是flink-1.11的语法。 可以修改成flink-1.10的语法试试,参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
| | xinghalo | | xingh...@163.com | 签名由网易邮箱大师定制 在2020年09月28日 09:16,hl9...@126.com<hl9...@126.com> 写道: flink版本1.10.2,问题重现如下,请问各位大佬是什么原因: ./sql-client.sh embedded Flink SQL> show tables ; [INFO] Result was empty. Flink SQL> CREATE TABLE tx ( account_id BIGINT, amount BIGINT, transaction_time TIMESTAMP(3), WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'heli01', 'properties.bootstrap.servers' = '10.100.51.56:9092', 'format' = 'csv' ); [INFO] Table has been created. Flink SQL> select * from tx ; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector=kafka format=csv properties.bootstrap.servers=10.100.51.56:9092 schema.0.data-type=BIGINT schema.0.name=account_id schema.1.data-type=BIGINT schema.1.name=amount schema.2.data-type=TIMESTAMP(3) schema.2.name=transaction_time schema.watermark.0.rowtime=transaction_time schema.watermark.0.strategy.data-type=TIMESTAMP(3) schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND topic=heli01 The following factories have been considered: org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory hl9...@126.com