Hi Flavio and Aljoscha,

Sorry for the late heads up. I could not actually reproduce the reported
problem with 'flink run' and local standalone cluster on master.
I get the expected output with the suggested modification of WordCount
program:

$ bin/start-cluster.sh

$ rm -rf out; bin/flink run
flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
flink/build-target/out

Executing WordCount example with default input data set.
Use --input to specify file input.
**************** SUBMITTED
Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
Program execution finished
Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
Job Runtime: 139 ms

**************** EXECUTED

Best,
Andrey

On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> JobListener.onJobExecuted() is only invoked in
> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
> of these is still in the call chain with that setup then the listener
> will not be invoked.
>
> Also, this would only happen on the client, not on the broker (in your
> case) or the server (JobManager).
>
> Does that help to debug the problem?
>
> Aljoscha
>
> On 19.11.20 09:49, Flavio Pompermaier wrote:
> > I have a spring boot job server that act as a broker towards our
> > application and a Flink session cluster. To submit a job I use the
> > FlinkRestClient (that is also the one used in the CLI client when I use
> the
> > run action it if I'm not wrong). However both methods don't trigger the
> job
> > listener.
> >
> > Il gio 19 nov 2020, 09:39 Aljoscha Krettek <aljos...@apache.org> ha
> scritto:
> >
> >> @Flavio, when you're saying you're using the RestClusterClient, you are
> >> not actually using that manually, right? You're just submitting your job
> >> via "bin/flink run ...", right?
> >>
> >> What's the exact invocation of "bin/flink run" that you're using?
> >>
> >> On 19.11.20 09:29, Andrey Zagrebin wrote:
> >>> Hi Flavio,
> >>>
> >>> I think I can reproduce what you are reporting (assuming you also pass
> >>> '--output' to 'flink run').
> >>> I am not sure why it behaves like this. I would suggest filing a Jira
> >>> ticket for this.
> >>>
> >>> Best,
> >>> Andrey
> >>>
> >>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
> pomperma...@okkam.it
> >>>
> >>> wrote:
> >>>
> >>>> is this a bug or is it a documentation problem...?
> >>>>
> >>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <pomperma...@okkam.it>
> ha
> >>>> scritto:
> >>>>
> >>>>> I've also verified that the problem persist also using a modified
> >> version
> >>>>> of the WordCount class.
> >>>>> If you add the code pasted at the end of this email at the end of its
> >>>>> main method you can verify that the listener is called if you run the
> >>>>> program from the IDE, but it's not called if you submit the job using
> >> the
> >>>>> CLI client using the command
> >>>>>
> >>>>>      - bin/flink run
> >>>>>
> >>
>  
> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
> >>>>>
> >>>>> Maybe this is an expected result but I didn't find any documentation
> of
> >>>>> this behaviour (neither in the Javadoc or in the flink web site,
> where
> >> I
> >>>>> can't find any documentation about JobListener at all).
> >>>>>
> >>>>> [Code to add to main()]
> >>>>>       // emit result
> >>>>>       if (params.has("output")) {
> >>>>>         counts.writeAsCsv(params.get("output"), "\n", " ");
> >>>>>         // execute program
> >>>>>         env.registerJobListener(new JobListener() {
> >>>>>
> >>>>>           @Override
> >>>>>           public void onJobSubmitted(JobClient arg0, Throwable arg1)
> {
> >>>>>             System.out.println("**************** SUBMITTED");
> >>>>>           }
> >>>>>
> >>>>>           @Override
> >>>>>           public void onJobExecuted(JobExecutionResult arg0,
> Throwable
> >>>>> arg1) {
> >>>>>             System.out.println("**************** EXECUTED");
> >>>>>           }
> >>>>>         });
> >>>>>         env.execute("WordCount Example");
> >>>>>       } else {
> >>>>>         System.out.println("Printing result to stdout. Use --output
> to
> >>>>> specify output path.");
> >>>>>         counts.print();
> >>>>>       }
> >>>>>
> >>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
> >> pomperma...@okkam.it>
> >>>>> wrote:
> >>>>>
> >>>>>> see inline
> >>>>>>
> >>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <matth...@ververica.com> ha
> >>>>>> scritto:
> >>>>>>
> >>>>>>> Hi Flavio,
> >>>>>>> thanks for sharing this with the Flink community. Could you answer
> >> the
> >>>>>>> following questions, please:
> >>>>>>> - What's the code of your Job's main method?
> >>>>>>>
> >>>>>>
> >>>>>> it's actually very simple...the main class creates a batch execution
> >> env
> >>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a
> job
> >>>>>> listener to the env and I do some stuff before calling
> env.execute().
> >>>>>> The listener is executed correctly but if I use the
> RestClusterClient
> >> to
> >>>>>> sibmit the jobGraph exyracted from that main contained in a jar, the
> >>>>>> program is executed as usual but the job listener is not called.
> >>>>>>
> >>>>>> - What cluster backend and application do you use to execute the
> job?
> >>>>>>>
> >>>>>>
> >>>>>> I use a standalone session cluster for the moment
> >>>>>>
> >>>>>> - Is there anything suspicious you can find in the logs that might
> be
> >>>>>>> related?
> >>>>>>>
> >>>>>>
> >>>>>> no unfortunately..
> >>>>>>
> >>>>>>
> >>>>>>> Best,
> >>>>>>> Matthias
> >>>>>>>
> >>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
> >>>>>>> pomperma...@okkam.it> wrote:
> >>>>>>>
> >>>>>>>> Actually what I'm experiencing is that the JobListener is executed
> >>>>>>>> successfully if I run my main class from the IDE, while the job
> >> listener is
> >>>>>>>> not fired at all if I submit the JobGraph of the application to a
> >> cluster
> >>>>>>>> using the RestClusterClient..
> >>>>>>>> Am I doing something wrong?
> >>>>>>>>
> >>>>>>>> My main class ends with the env.execute() and i do
> >>>>>>>> env.registerJobListener() when I create the Exceution env
> >>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
> >>>>>>>>
> >>>>>>>> Thanks in advance for any help,
> >>>>>>>> Flavio
> >>>>>>>>
> >>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
> >>>>>>>> pomperma...@okkam.it> wrote:
> >>>>>>>>
> >>>>>>>>> Hello everybody,
> >>>>>>>>> I'm trying to use the JobListener to track when a job finishes
> >> (with
> >>>>>>>>> Flink 1.11.0).
> >>>>>>>>> It works great but I have the problem that logs inside
> >>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Flavio
> >>>>>>>>>
> >>>>>>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to