You're right..I removed my flink dir and I re-extracted it and now it
works. Unfortunately I didn't keep the old version to understand what
were the difference but the error was probably caused by the fact that
I had a previous version of the WordCount.jar (without the listener)
in the flink lib dir.. (in another dev session I was experimenting in
running the job having the user jar in the lib dir). Sorry for the
confusion.
Just one last question: is the listener executed on the client or on
the job server? This is not entirely clear to me..

Best,
Flavio

On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin <azagre...@apache.org> wrote:
>
> I also tried 1.11.0 and 1.11.2, both work for me.
>
> On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <aljos...@apache.org> wrote:
>>
>> Hmm, there was this issue:
>> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
>> in your version.
>>
>> On 19.11.20 12:58, Flavio Pompermaier wrote:
>> > Which version are you using?
>> > I used the exact same commands on Flink 1.11.0 and I didn't get the job
>> > listener output..
>> >
>> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <azagre...@apache.org> ha 
>> > scritto:
>> >
>> >> Hi Flavio and Aljoscha,
>> >>
>> >> Sorry for the late heads up. I could not actually reproduce the reported
>> >> problem with 'flink run' and local standalone cluster on master.
>> >> I get the expected output with the suggested modification of WordCount
>> >> program:
>> >>
>> >> $ bin/start-cluster.sh
>> >>
>> >> $ rm -rf out; bin/flink run
>> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
>> >> flink/build-target/out
>> >>
>> >> Executing WordCount example with default input data set.
>> >> Use --input to specify file input.
>> >> **************** SUBMITTED
>> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
>> >> Program execution finished
>> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
>> >> Job Runtime: 139 ms
>> >>
>> >> **************** EXECUTED
>> >>
>> >> Best,
>> >> Andrey
>> >>
>> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <aljos...@apache.org>
>> >> wrote:
>> >>
>> >>> JobListener.onJobExecuted() is only invoked in
>> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
>> >>> of these is still in the call chain with that setup then the listener
>> >>> will not be invoked.
>> >>>
>> >>> Also, this would only happen on the client, not on the broker (in your
>> >>> case) or the server (JobManager).
>> >>>
>> >>> Does that help to debug the problem?
>> >>>
>> >>> Aljoscha
>> >>>
>> >>> On 19.11.20 09:49, Flavio Pompermaier wrote:
>> >>>> I have a spring boot job server that act as a broker towards our
>> >>>> application and a Flink session cluster. To submit a job I use the
>> >>>> FlinkRestClient (that is also the one used in the CLI client when I use
>> >>> the
>> >>>> run action it if I'm not wrong). However both methods don't trigger the
>> >>> job
>> >>>> listener.
>> >>>>
>> >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <aljos...@apache.org> ha
>> >>> scritto:
>> >>>>
>> >>>>> @Flavio, when you're saying you're using the RestClusterClient, you are
>> >>>>> not actually using that manually, right? You're just submitting your
>> >>> job
>> >>>>> via "bin/flink run ...", right?
>> >>>>>
>> >>>>> What's the exact invocation of "bin/flink run" that you're using?
>> >>>>>
>> >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote:
>> >>>>>> Hi Flavio,
>> >>>>>>
>> >>>>>> I think I can reproduce what you are reporting (assuming you also pass
>> >>>>>> '--output' to 'flink run').
>> >>>>>> I am not sure why it behaves like this. I would suggest filing a Jira
>> >>>>>> ticket for this.
>> >>>>>>
>> >>>>>> Best,
>> >>>>>> Andrey
>> >>>>>>
>> >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
>> >>> pomperma...@okkam.it
>> >>>>>>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> is this a bug or is it a documentation problem...?
>> >>>>>>>
>> >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <pomperma...@okkam.it>
>> >>> ha
>> >>>>>>> scritto:
>> >>>>>>>
>> >>>>>>>> I've also verified that the problem persist also using a modified
>> >>>>> version
>> >>>>>>>> of the WordCount class.
>> >>>>>>>> If you add the code pasted at the end of this email at the end of
>> >>> its
>> >>>>>>>> main method you can verify that the listener is called if you run
>> >>> the
>> >>>>>>>> program from the IDE, but it's not called if you submit the job
>> >>> using
>> >>>>> the
>> >>>>>>>> CLI client using the command
>> >>>>>>>>
>> >>>>>>>>       - bin/flink run
>> >>>>>>>>
>> >>>>>
>> >>>   
>> >>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>> >>>>>>>>
>> >>>>>>>> Maybe this is an expected result but I didn't find any
>> >>> documentation of
>> >>>>>>>> this behaviour (neither in the Javadoc or in the flink web site,
>> >>> where
>> >>>>> I
>> >>>>>>>> can't find any documentation about JobListener at all).
>> >>>>>>>>
>> >>>>>>>> [Code to add to main()]
>> >>>>>>>>        // emit result
>> >>>>>>>>        if (params.has("output")) {
>> >>>>>>>>          counts.writeAsCsv(params.get("output"), "\n", " ");
>> >>>>>>>>          // execute program
>> >>>>>>>>          env.registerJobListener(new JobListener() {
>> >>>>>>>>
>> >>>>>>>>            @Override
>> >>>>>>>>            public void onJobSubmitted(JobClient arg0, Throwable
>> >>> arg1) {
>> >>>>>>>>              System.out.println("**************** SUBMITTED");
>> >>>>>>>>            }
>> >>>>>>>>
>> >>>>>>>>            @Override
>> >>>>>>>>            public void onJobExecuted(JobExecutionResult arg0,
>> >>> Throwable
>> >>>>>>>> arg1) {
>> >>>>>>>>              System.out.println("**************** EXECUTED");
>> >>>>>>>>            }
>> >>>>>>>>          });
>> >>>>>>>>          env.execute("WordCount Example");
>> >>>>>>>>        } else {
>> >>>>>>>>          System.out.println("Printing result to stdout. Use --output
>> >>> to
>> >>>>>>>> specify output path.");
>> >>>>>>>>          counts.print();
>> >>>>>>>>        }
>> >>>>>>>>
>> >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
>> >>>>> pomperma...@okkam.it>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> see inline
>> >>>>>>>>>
>> >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <matth...@ververica.com>
>> >>> ha
>> >>>>>>>>> scritto:
>> >>>>>>>>>
>> >>>>>>>>>> Hi Flavio,
>> >>>>>>>>>> thanks for sharing this with the Flink community. Could you answer
>> >>>>> the
>> >>>>>>>>>> following questions, please:
>> >>>>>>>>>> - What's the code of your Job's main method?
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> it's actually very simple...the main class creates a batch
>> >>> execution
>> >>>>> env
>> >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a
>> >>> job
>> >>>>>>>>> listener to the env and I do some stuff before calling
>> >>> env.execute().
>> >>>>>>>>> The listener is executed correctly but if I use the
>> >>> RestClusterClient
>> >>>>> to
>> >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar,
>> >>> the
>> >>>>>>>>> program is executed as usual but the job listener is not called.
>> >>>>>>>>>
>> >>>>>>>>> - What cluster backend and application do you use to execute the
>> >>> job?
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> I use a standalone session cluster for the moment
>> >>>>>>>>>
>> >>>>>>>>> - Is there anything suspicious you can find in the logs that might
>> >>> be
>> >>>>>>>>>> related?
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> no unfortunately..
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>> Best,
>> >>>>>>>>>> Matthias
>> >>>>>>>>>>
>> >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
>> >>>>>>>>>> pomperma...@okkam.it> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is
>> >>> executed
>> >>>>>>>>>>> successfully if I run my main class from the IDE, while the job
>> >>>>> listener is
>> >>>>>>>>>>> not fired at all if I submit the JobGraph of the application to a
>> >>>>> cluster
>> >>>>>>>>>>> using the RestClusterClient..
>> >>>>>>>>>>> Am I doing something wrong?
>> >>>>>>>>>>>
>> >>>>>>>>>>> My main class ends with the env.execute() and i do
>> >>>>>>>>>>> env.registerJobListener() when I create the Exceution env
>> >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks in advance for any help,
>> >>>>>>>>>>> Flavio
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
>> >>>>>>>>>>> pomperma...@okkam.it> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Hello everybody,
>> >>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes
>> >>>>> (with
>> >>>>>>>>>>>> Flink 1.11.0).
>> >>>>>>>>>>>> It works great but I have the problem that logs inside
>> >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Best,
>> >>>>>>>>>>>> Flavio
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >
>>

Reply via email to