When you call env.execute() the StreamExecutionEnvironment is being reset, clearing all sources/transformations from it. That's why env.getExecutionPlan() complains; there aren't any operations so a plan cannot be created.

You need to create the execution plan before calling execute().

String executionPlan = env.getExecutionPlan();
env.execute();
logger.info("Started job; executionPlan={}", getExecutionPlan);

On 14/02/2022 17:40, Shane Bishop wrote:
Hi all,

My team has started seeing the error "java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute." However, even with this error, the Flink application starts and runs fine, and the Flink job renders fine in the Flink Dashboard.

Attached is the full stacktrace.

This error comes from when we call StreamExecutionEnvironment#getExecutionPlan(). In the code snippet below, we call this method on the last line of the snippet.

From poking around online, I found https://stackoverflow.com/questions/54977290/flink-no-operators-defined-in-streaming-topology-cannot-execute, which suggests the problem could be that we don't set a sink, but in the code below you will see we do set a sink (just maybe not in a way that getExecutionPlan() expects).

Can this be safely ignored? Is there something we can do so that getExecutionPlan() will work properly, or otherwise fix/suppress this error?

Below is the code (some portions have been redacted):

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<RedactedType> source =
    env.addSource(new RedactedType())
        .uid(<redacted>)
        .name(<redacted>)
        .shuffle();

DataStream<Tuple3<Long, String, byte[]>> stream =
    AsyncDataStream.unorderedWait(source, new RedactedType(), 10000, TimeUnit.MILLISECONDS)
        .uid(<redacted>)
        .name(<redacted>);

stream
    .flatMap(new RedactedType())
    .uid(<redacted>)
    .name(<redacted>)
    .flatMap(new RedactedType())
    .uid(<redacted>)
    .name(<redacted>)
    .shuffle()
    .addSink(new RedactedType()) // Set sink
    .uid(<redacted>)
    .name(<redacted>);

env.execute("<Redacted job name>");
logger.info("Started job; executionPlan={}", env.getExecutionPlan()); // line 66

Thanks,
Shane

Reply via email to