I have a spring boot job server that act as a broker towards our
application and a Flink session cluster. To submit a job I use the
FlinkRestClient (that is also the one used in the CLI client when I use the
run action it if I'm not wrong). However both methods don't trigger the job
listener.

Il gio 19 nov 2020, 09:39 Aljoscha Krettek <aljos...@apache.org> ha scritto:

> @Flavio, when you're saying you're using the RestClusterClient, you are
> not actually using that manually, right? You're just submitting your job
> via "bin/flink run ...", right?
>
> What's the exact invocation of "bin/flink run" that you're using?
>
> On 19.11.20 09:29, Andrey Zagrebin wrote:
> > Hi Flavio,
> >
> > I think I can reproduce what you are reporting (assuming you also pass
> > '--output' to 'flink run').
> > I am not sure why it behaves like this. I would suggest filing a Jira
> > ticket for this.
> >
> > Best,
> > Andrey
> >
> > On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <pomperma...@okkam.it
> >
> > wrote:
> >
> >> is this a bug or is it a documentation problem...?
> >>
> >> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <pomperma...@okkam.it> ha
> >> scritto:
> >>
> >>> I've also verified that the problem persist also using a modified
> version
> >>> of the WordCount class.
> >>> If you add the code pasted at the end of this email at the end of its
> >>> main method you can verify that the listener is called if you run the
> >>> program from the IDE, but it's not called if you submit the job using
> the
> >>> CLI client using the command
> >>>
> >>>     - bin/flink run
> >>>
>  
> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
> >>>
> >>> Maybe this is an expected result but I didn't find any documentation of
> >>> this behaviour (neither in the Javadoc or in the flink web site, where
> I
> >>> can't find any documentation about JobListener at all).
> >>>
> >>> [Code to add to main()]
> >>>      // emit result
> >>>      if (params.has("output")) {
> >>>        counts.writeAsCsv(params.get("output"), "\n", " ");
> >>>        // execute program
> >>>        env.registerJobListener(new JobListener() {
> >>>
> >>>          @Override
> >>>          public void onJobSubmitted(JobClient arg0, Throwable arg1) {
> >>>            System.out.println("**************** SUBMITTED");
> >>>          }
> >>>
> >>>          @Override
> >>>          public void onJobExecuted(JobExecutionResult arg0, Throwable
> >>> arg1) {
> >>>            System.out.println("**************** EXECUTED");
> >>>          }
> >>>        });
> >>>        env.execute("WordCount Example");
> >>>      } else {
> >>>        System.out.println("Printing result to stdout. Use --output to
> >>> specify output path.");
> >>>        counts.print();
> >>>      }
> >>>
> >>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
> pomperma...@okkam.it>
> >>> wrote:
> >>>
> >>>> see inline
> >>>>
> >>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <matth...@ververica.com> ha
> >>>> scritto:
> >>>>
> >>>>> Hi Flavio,
> >>>>> thanks for sharing this with the Flink community. Could you answer
> the
> >>>>> following questions, please:
> >>>>> - What's the code of your Job's main method?
> >>>>>
> >>>>
> >>>> it's actually very simple...the main class creates a batch execution
> env
> >>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a job
> >>>> listener to the env and I do some stuff before calling env.execute().
> >>>> The listener is executed correctly but if I use the RestClusterClient
> to
> >>>> sibmit the jobGraph exyracted from that main contained in a jar, the
> >>>> program is executed as usual but the job listener is not called.
> >>>>
> >>>> - What cluster backend and application do you use to execute the job?
> >>>>>
> >>>>
> >>>> I use a standalone session cluster for the moment
> >>>>
> >>>> - Is there anything suspicious you can find in the logs that might be
> >>>>> related?
> >>>>>
> >>>>
> >>>> no unfortunately..
> >>>>
> >>>>
> >>>>> Best,
> >>>>> Matthias
> >>>>>
> >>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
> >>>>> pomperma...@okkam.it> wrote:
> >>>>>
> >>>>>> Actually what I'm experiencing is that the JobListener is executed
> >>>>>> successfully if I run my main class from the IDE, while the job
> listener is
> >>>>>> not fired at all if I submit the JobGraph of the application to a
> cluster
> >>>>>> using the RestClusterClient..
> >>>>>> Am I doing something wrong?
> >>>>>>
> >>>>>> My main class ends with the env.execute() and i do
> >>>>>> env.registerJobListener() when I create the Exceution env
> >>>>>> via ExecutionEnvironment.getExecutionEnvironment().
> >>>>>>
> >>>>>> Thanks in advance for any help,
> >>>>>> Flavio
> >>>>>>
> >>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
> >>>>>> pomperma...@okkam.it> wrote:
> >>>>>>
> >>>>>>> Hello everybody,
> >>>>>>> I'm trying to use the JobListener to track when a job finishes
> (with
> >>>>>>> Flink 1.11.0).
> >>>>>>> It works great but I have the problem that logs inside
> >>>>>>> the onJobExecuted are not logged anywhere..is it normal?
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Flavio
> >>>>>>>
> >>>>>>
> >
>
>

Reply via email to