When you call env.execute() the StreamExecutionEnvironment is being
reset, clearing all sources/transformations from it.
That's why env.getExecutionPlan() complains; there aren't any operations
so a plan cannot be created.
You need to create the execution plan before calling execute().
String executionPlan = env.getExecutionPlan();
env.execute();
logger.info("Started job; executionPlan={}", getExecutionPlan);
On 14/02/2022 17:40, Shane Bishop wrote:
Hi all,
My team has started seeing the error "java.lang.IllegalStateException:
No operators defined in streaming topology. Cannot execute." However,
even with this error, the Flink application starts and runs fine, and
the Flink job renders fine in the Flink Dashboard.
Attached is the full stacktrace.
This error comes from when we call
StreamExecutionEnvironment#getExecutionPlan(). In the code snippet
below, we call this method on the last line of the snippet.
From poking around online, I found
https://stackoverflow.com/questions/54977290/flink-no-operators-defined-in-streaming-topology-cannot-execute,
which suggests the problem could be that we don't set a sink, but in
the code below you will see we do set a sink (just maybe not in a way
that getExecutionPlan() expects).
Can this be safely ignored? Is there something we can do so that
getExecutionPlan() will work properly, or otherwise fix/suppress this
error?
Below is the code (some portions have been redacted):
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RedactedType> source =
env.addSource(new RedactedType())
.uid(<redacted>)
.name(<redacted>)
.shuffle();
DataStream<Tuple3<Long, String, byte[]>> stream =
AsyncDataStream.unorderedWait(source, new RedactedType(), 10000,
TimeUnit.MILLISECONDS)
.uid(<redacted>)
.name(<redacted>);
stream
.flatMap(new RedactedType())
.uid(<redacted>)
.name(<redacted>)
.flatMap(new RedactedType())
.uid(<redacted>)
.name(<redacted>)
.shuffle()
.addSink(new RedactedType()) // Set sink
.uid(<redacted>)
.name(<redacted>);
env.execute("<Redacted job name>");
logger.info("Started job; executionPlan={}", env.getExecutionPlan());
// line 66
Thanks,
Shane