JobListener.onJobExecuted() is only invoked in ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none of these is still in the call chain with that setup then the listener will not be invoked.

Also, this would only happen on the client, not on the broker (in your case) or the server (JobManager).

Does that help to debug the problem?

Aljoscha

On 19.11.20 09:49, Flavio Pompermaier wrote:
I have a spring boot job server that act as a broker towards our
application and a Flink session cluster. To submit a job I use the
FlinkRestClient (that is also the one used in the CLI client when I use the
run action it if I'm not wrong). However both methods don't trigger the job
listener.

Il gio 19 nov 2020, 09:39 Aljoscha Krettek <aljos...@apache.org> ha scritto:

@Flavio, when you're saying you're using the RestClusterClient, you are
not actually using that manually, right? You're just submitting your job
via "bin/flink run ...", right?

What's the exact invocation of "bin/flink run" that you're using?

On 19.11.20 09:29, Andrey Zagrebin wrote:
Hi Flavio,

I think I can reproduce what you are reporting (assuming you also pass
'--output' to 'flink run').
I am not sure why it behaves like this. I would suggest filing a Jira
ticket for this.

Best,
Andrey

On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <pomperma...@okkam.it

wrote:

is this a bug or is it a documentation problem...?

Il sab 14 nov 2020, 18:44 Flavio Pompermaier <pomperma...@okkam.it> ha
scritto:

I've also verified that the problem persist also using a modified
version
of the WordCount class.
If you add the code pasted at the end of this email at the end of its
main method you can verify that the listener is called if you run the
program from the IDE, but it's not called if you submit the job using
the
CLI client using the command

     - bin/flink run

  /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar

Maybe this is an expected result but I didn't find any documentation of
this behaviour (neither in the Javadoc or in the flink web site, where
I
can't find any documentation about JobListener at all).

[Code to add to main()]
      // emit result
      if (params.has("output")) {
        counts.writeAsCsv(params.get("output"), "\n", " ");
        // execute program
        env.registerJobListener(new JobListener() {

          @Override
          public void onJobSubmitted(JobClient arg0, Throwable arg1) {
            System.out.println("**************** SUBMITTED");
          }

          @Override
          public void onJobExecuted(JobExecutionResult arg0, Throwable
arg1) {
            System.out.println("**************** EXECUTED");
          }
        });
        env.execute("WordCount Example");
      } else {
        System.out.println("Printing result to stdout. Use --output to
specify output path.");
        counts.print();
      }

On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
pomperma...@okkam.it>
wrote:

see inline

Il ven 13 nov 2020, 14:31 Matthias Pohl <matth...@ververica.com> ha
scritto:

Hi Flavio,
thanks for sharing this with the Flink community. Could you answer
the
following questions, please:
- What's the code of your Job's main method?


it's actually very simple...the main class creates a batch execution
env
using ExecutionEnvironment.getExecutionEnvironment(), I register a job
listener to the env and I do some stuff before calling env.execute().
The listener is executed correctly but if I use the RestClusterClient
to
sibmit the jobGraph exyracted from that main contained in a jar, the
program is executed as usual but the job listener is not called.

- What cluster backend and application do you use to execute the job?


I use a standalone session cluster for the moment

- Is there anything suspicious you can find in the logs that might be
related?


no unfortunately..


Best,
Matthias

On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
pomperma...@okkam.it> wrote:

Actually what I'm experiencing is that the JobListener is executed
successfully if I run my main class from the IDE, while the job
listener is
not fired at all if I submit the JobGraph of the application to a
cluster
using the RestClusterClient..
Am I doing something wrong?

My main class ends with the env.execute() and i do
env.registerJobListener() when I create the Exceution env
via ExecutionEnvironment.getExecutionEnvironment().

Thanks in advance for any help,
Flavio

On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
pomperma...@okkam.it> wrote:

Hello everybody,
I'm trying to use the JobListener to track when a job finishes
(with
Flink 1.11.0).
It works great but I have the problem that logs inside
the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio







Reply via email to