是Flink1.12的,kafka消费端能读取到数据,但是下面的代码无法读取到数据,运行后没有报错也没有输出,求助,谢谢



import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.math.Ordering.Int



object FlinkKafkaDDLDemo
{

def main(args: Array[String]): Unit =
{

val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(3)



val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

val createTable =
"""
              |CREATE TABLE PERSON (

              |    name VARCHAR COMMENT '姓名',

              |    age VARCHAR COMMENT '年龄',

              |    city VARCHAR COMMENT '所在城市',

              |    address VARCHAR COMMENT '家庭住址',

              |    ts TIMESTAMP(3) COMMENT '时间戳'

              |)

              |WITH (

              |    'connector.type' = 'kafka', -- 使用 kafka connector

              |    'connector.version' = 'universal',  -- kafka 版本

              |    'connector.topic' = 'kafka_ddl',  -- kafka topic

              |    'connector.startup-mode' = 'earliest-offset', -- 从最早的 offset 
开始读取

              |    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息

              |    'connector.properties.0.value' = 'Desktop:2181',

              |    'connector.properties.1.key' = 'bootstrap.servers',

              |    'connector.properties.1.value' = 'Desktop:9091',

              |    'update-mode' = 'append',

              |    'format.type' = 'json',  -- 数据源格式为 json

              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则

              |)

            """.stripMargin



tEnv.executeSql(createTable)



val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY 
name""".stripMargin



val result: Table = tEnv.sqlQuery(query)

tEnv.toRetractStream[Row](result).print()
//    tEnv.execute("Flink SQL DDL")

}

}

Reply via email to