Hi: 我Flink消费kafka的数据,我创建一张表如下: val kafkaSource = """ |create table kafka_order( |order_id string, |order_price decimal(10,2), |order_time timestamp(3) |) |with( |'connector' = 'kafka', |'topic' = 'iceberg.order', |'properties.bootstrap.servers' = 'hostname:9092', |'format' = 'json', |'properties.group.id' = 'data-lake', |'scan.startup.mode' = 'earliest-offset', |'json.ignore-parse-errors' = 'false' |) |""".stripMargin tEnv.executeSql(kafkaSource)
我直接查询后print到控制台时,没法消费成功,如下: val query = """ |select * from kafka_order |""".stripMargin tEnv.executeSql(query).print() 但是我创建一个print的connect,然后insert into 表 select * from kafka_order这样是可以正常消费的,如下: val print = """ |create table p_order( |order_id string, |order_price decimal(10,2), |order_time timestamp(3) |) |with( |'connector' = 'print' |) |""".stripMargin tEnv.executeSql(print) val query = """ |insert into p_order |select * from kafka_order |""".stripMargin tEnv.executeSql(query) 这具体是为什么呢?望知道的大佬告知一下,感激不尽。 祝好!