flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 
需要实现TableSourceFactory,还是其他什么.


提示:  
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. findAndCreateTableSource failed.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.




代码:
```
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.{EnvironmentSettings, Types}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.types.Row


object KafkaInDDL extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val settings: EnvironmentSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
settings)


  val sourceDDL =
    """create table sourceTable(
                            id int,
                            name varchar
                          ) with (
                            'connector.type' = 'kafka',
                            'connector.property-version' = '1',
                            'update-mode' = 'append',
                            'bootstrap.servers' = '192.168.1.160:19092',
                            'connector.topic' = 'hbtest1',
                            'connector.startup-mode' = 'earliest-offset'
                          )
    """
  tEnv.sqlUpdate(sourceDDL)
  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
  tEnv.execute("")
}
```

回复