请教一下各位大佬:
flink1.15.4
resultTable.executeInsert("active_users_bm")
在执行计划里显示的都是Sink类型操作,为什么会报错,报错如下:
No operators defined in streaming topology. Cannot execute.
java.lang.IllegalStateException: No operators defined in streaming
topology. Cannot execute.
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:2018)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2009)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1994)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1833)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801)


本地执行代码如下:
val settings = EnvironmentSettings.newInstance.inStreamingMode.build
val tableEnv = StreamTableEnvironment.create(env, settings)
val inputTable = tableEnv.fromDataStream(source)
val ddl = """
            |CREATE TABLE active_users_bm (
            |                                `dt` INT,
            |                                `uid` BIGINT
            |    ) WITH (
            |       'load-url' = 'xxxx:8030',
            |       'jdbc-url' = 'jdbc:mysql://xxxx:9030',
            |       'connector' = 'starrocks',
            |       'sink.properties.columns' = 'dt, uid,
ubm=to_bitmap(uid)',
            |       'database-name' = 'xx',
            |       'table-name' = 'active_users_bm',
            |       'username' = 'user',
            |       'password' = 'xxx'
            |    )
            |""".stripMargin

// 创建流执行环境和 Table 环境
tableEnv.executeSql(ddl)
// 执行查询
val resultTable = tableEnv.sqlQuery(s"SELECT 1 dt, 1 uid FROM $inputTable")
// 将结果表写入外部系统
resultTable.executeInsert("active_users_bm")
// resultTable.execute().print()
// 执行程序
env.execute("Flink Table to External System")



执行计划日志:
[Source: xxx-source ->
DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1,
type=*xxx* NOT NULL, rowtime=false, watermark=false) -> Calc(select=[1 AS
dt, 1:BIGINT AS uid]) -> Sink:
Sink(table=[default_catalog.default_database.active_users_bm], fields=[dt,
uid]) (1/1)#0] WARN org.apache.flink.metrics.MetricGroup - The operator
name Sink: Sink(table=[default_catalog.default_database.active_users_bm],
fields=[dt, uid]) exceeded the 80 characters length limit and was
truncated.

回复