是Flink1.12的,kafka消费端能读取到数据,但是下面的代码无法读取到数据,运行后没有报错也没有输出,求助,谢谢
import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{EnvironmentSettings, Table} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.Row import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import scala.math.Ordering.Int object FlinkKafkaDDLDemo { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(3) val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings) val createTable = """ |CREATE TABLE PERSON ( | name VARCHAR COMMENT '姓名', | age VARCHAR COMMENT '年龄', | city VARCHAR COMMENT '所在城市', | address VARCHAR COMMENT '家庭住址', | ts TIMESTAMP(3) COMMENT '时间戳' |) |WITH ( | 'connector.type' = 'kafka', -- 使用 kafka connector | 'connector.version' = 'universal', -- kafka 版本 | 'connector.topic' = 'kafka_ddl', -- kafka topic | 'connector.startup-mode' = 'earliest-offset', -- 从最早的 offset 开始读取 | 'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息 | 'connector.properties.0.value' = 'Desktop:2181', | 'connector.properties.1.key' = 'bootstrap.servers', | 'connector.properties.1.value' = 'Desktop:9091', | 'update-mode' = 'append', | 'format.type' = 'json', -- 数据源格式为 json | 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则 |) """.stripMargin tEnv.executeSql(createTable) val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY name""".stripMargin val result: Table = tEnv.sqlQuery(query) tEnv.toRetractStream[Row](result).print() // tEnv.execute("Flink SQL DDL") } }