flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么.
提示: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. 代码: ``` import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.table.api.{EnvironmentSettings, Types} import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} import org.apache.flink.types.Row object KafkaInDDL extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) val sourceDDL = """create table sourceTable( id int, name varchar ) with ( 'connector.type' = 'kafka', 'connector.property-version' = '1', 'update-mode' = 'append', 'bootstrap.servers' = '192.168.1.160:19092', 'connector.topic' = 'hbtest1', 'connector.startup-mode' = 'earliest-offset' ) """ tEnv.sqlUpdate(sourceDDL) tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() tEnv.execute("") } ```