请教一下各位大佬: flink1.15.4 resultTable.executeInsert("active_users_bm") 在执行计划里显示的都是Sink类型操作,为什么会报错,报错如下: No operators defined in streaming topology. Cannot execute. java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:2018) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2009) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1994) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1833) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801)
本地执行代码如下: val settings = EnvironmentSettings.newInstance.inStreamingMode.build val tableEnv = StreamTableEnvironment.create(env, settings) val inputTable = tableEnv.fromDataStream(source) val ddl = """ |CREATE TABLE active_users_bm ( | `dt` INT, | `uid` BIGINT | ) WITH ( | 'load-url' = 'xxxx:8030', | 'jdbc-url' = 'jdbc:mysql://xxxx:9030', | 'connector' = 'starrocks', | 'sink.properties.columns' = 'dt, uid, ubm=to_bitmap(uid)', | 'database-name' = 'xx', | 'table-name' = 'active_users_bm', | 'username' = 'user', | 'password' = 'xxx' | ) |""".stripMargin // 创建流执行环境和 Table 环境 tableEnv.executeSql(ddl) // 执行查询 val resultTable = tableEnv.sqlQuery(s"SELECT 1 dt, 1 uid FROM $inputTable") // 将结果表写入外部系统 resultTable.executeInsert("active_users_bm") // resultTable.execute().print() // 执行程序 env.execute("Flink Table to External System") 执行计划日志: [Source: xxx-source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=*xxx* NOT NULL, rowtime=false, watermark=false) -> Calc(select=[1 AS dt, 1:BIGINT AS uid]) -> Sink: Sink(table=[default_catalog.default_database.active_users_bm], fields=[dt, uid]) (1/1)#0] WARN org.apache.flink.metrics.MetricGroup - The operator name Sink: Sink(table=[default_catalog.default_database.active_users_bm], fields=[dt, uid]) exceeded the 80 characters length limit and was truncated.