Re: Cannot run pyflink example using Flink CLI

2022-10-21 Thread Levan Huyen
Great, thanks!

Kind regards,
Levan Huyen

On Fri, 21 Oct 2022 at 00:53, Biao Geng  wrote:

> You are right.
> It contains the python package `pyflink` and some dependencies like py4j
> and cloudpickle but does not contain all relevant dependencies(e.g.
> `google.protobuf` as the error log shows, which I also reproduce in my own
> machine).
>
> Best,
> Biao Geng
>
> Levan Huyen  于2022年10月20日周四 19:53写道:
>
>> Thanks Biao.
>>
>> May I ask one more question: does the binary package on Apache site (e.g:
>> https://archive.apache.org/dist/flink/flink-1.15.2) contain the python
>> package `pyflink` and its dependencies? I guess the answer is no.
>>
>> Thanks and regards,
>> Levan Huyen
>>
>> On Thu, 20 Oct 2022 at 18:13, Biao Geng  wrote:
>>
>>> Hi Levan,
>>> Great to hear that your issue is resolved!
>>> For the follow-up question, I am not quite familiar with AWS EMR's
>>> configuration for flink but due to the error you attached, it looks like
>>> that pyflink may not ship some 'Google' dependencies in the Flink binary
>>> zip file and as a result, it will try to find it in your python
>>> environment. cc @hxbks...@gmail.com
>>> For now, to manage the complex python dependencies, the typical usage of
>>> pyflink in multiple node clusters for production is to create your venv and
>>> use it in your `flink run` command or in the python code. You can refer to
>>> this doc
>>> 
>>> for details.
>>>
>>> Best,
>>> Biao Geng
>>>
>>> Levan Huyen  于2022年10月20日周四 14:11写道:
>>>
 Hi Biao,

 Thanks for your help. That solved my issue. It turned out that in
 setup1 (in EMR), I got apache-flink installed, but the package (and its
 dependencies) are not in the directory `/usr/lib/python3.7/site-packages`
 (corresponding to the python binary in `/usr/bin/python3`). For some
 reason, the packages are in the current user's location (`~/.local/...)
 which Flink did not look at.

 BTW, is there any way to use the pyflink shipped with the Flink binary
 zip file that I downloaded from Apache's site? On EMR, such package is
 included, and I feel it's awkward to have to install another version using
 `pip install`. It will also be confusing about where to add the
 dependencies jars.

 Thanks and regards,
 Levan Huyen


 On Thu, 20 Oct 2022 at 02:25, Biao Geng  wrote:

> Hi Levan,
>
> For your setup1 & 2, it looks like the python environment is not
> ready. Have you tried python -m pip install apache-flink for the
> first 2 setups?
> For your setup3, as you are trying to use `flink run ...` command, it
> will try to connect to a launched flink cluster but I guess you did not
> launch the flink cluster. You can do `start-cluster.sh` first to launch a
> standalone flink cluster and then try the `flink run ...` command.
> For your setup4, the reason why it works well is that it will use the
> default mini cluster to run the pyflink job. So even you haven't started a
> standalone cluster, it can work as well.
>
> Best,
> Biao Geng
>
> Levan Huyen  于2022年10月19日周三 17:07写道:
>
>> Hi,
>>
>> I'm new to PyFlink, and I couldn't run a basic example that shipped
>> with Flink.
>> This is the command I tried:
>>
>> ./bin/flink run -py examples/python/datastream/word_count.py
>>
>> Here below are the results I got with different setups:
>>
>> 1. On AWS EMR 6.8.0 (Flink 1.15.1):
>> *Error: No module named 'google'*I tried with the Flink shipped with
>> EMR, or the binary v1.15.1/v1.15.2 downloaded from Flink site. I got that
>> same error message in all cases.
>>
>> Traceback (most recent call last):
>>
>>   File "/usr/lib64/python3.7/runpy.py", line 193, in
>> _run_module_as_main
>>
>> "__main__", mod_spec)
>>
>>   File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
>>
>> exec(code, run_globals)
>>
>>   File
>> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
>> line 134, in 
>>
>> word_count(known_args.input, known_args.output)
>>
>>   File
>> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
>> line 89, in word_count
>>
>> ds = ds.flat_map(split) \
>>
>>   File
>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
>> line 333, in flat_map
>>
>>   File
>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
>> line 557, in process
>>
>>   File
>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
>> line 23, in 
>>
>> ModuleNotFoundError: No module named 'google'
>>

Re: Cannot run pyflink example using Flink CLI

2022-10-20 Thread Biao Geng
You are right.
It contains the python package `pyflink` and some dependencies like py4j
and cloudpickle but does not contain all relevant dependencies(e.g.
`google.protobuf` as the error log shows, which I also reproduce in my own
machine).

Best,
Biao Geng

Levan Huyen  于2022年10月20日周四 19:53写道:

> Thanks Biao.
>
> May I ask one more question: does the binary package on Apache site (e.g:
> https://archive.apache.org/dist/flink/flink-1.15.2) contain the python
> package `pyflink` and its dependencies? I guess the answer is no.
>
> Thanks and regards,
> Levan Huyen
>
> On Thu, 20 Oct 2022 at 18:13, Biao Geng  wrote:
>
>> Hi Levan,
>> Great to hear that your issue is resolved!
>> For the follow-up question, I am not quite familiar with AWS EMR's
>> configuration for flink but due to the error you attached, it looks like
>> that pyflink may not ship some 'Google' dependencies in the Flink binary
>> zip file and as a result, it will try to find it in your python
>> environment. cc @hxbks...@gmail.com
>> For now, to manage the complex python dependencies, the typical usage of
>> pyflink in multiple node clusters for production is to create your venv and
>> use it in your `flink run` command or in the python code. You can refer to
>> this doc
>> 
>> for details.
>>
>> Best,
>> Biao Geng
>>
>> Levan Huyen  于2022年10月20日周四 14:11写道:
>>
>>> Hi Biao,
>>>
>>> Thanks for your help. That solved my issue. It turned out that in setup1
>>> (in EMR), I got apache-flink installed, but the package (and its
>>> dependencies) are not in the directory `/usr/lib/python3.7/site-packages`
>>> (corresponding to the python binary in `/usr/bin/python3`). For some
>>> reason, the packages are in the current user's location (`~/.local/...)
>>> which Flink did not look at.
>>>
>>> BTW, is there any way to use the pyflink shipped with the Flink binary
>>> zip file that I downloaded from Apache's site? On EMR, such package is
>>> included, and I feel it's awkward to have to install another version using
>>> `pip install`. It will also be confusing about where to add the
>>> dependencies jars.
>>>
>>> Thanks and regards,
>>> Levan Huyen
>>>
>>>
>>> On Thu, 20 Oct 2022 at 02:25, Biao Geng  wrote:
>>>
 Hi Levan,

 For your setup1 & 2, it looks like the python environment is not ready.
 Have you tried python -m pip install apache-flink for the first 2
 setups?
 For your setup3, as you are trying to use `flink run ...` command, it
 will try to connect to a launched flink cluster but I guess you did not
 launch the flink cluster. You can do `start-cluster.sh` first to launch a
 standalone flink cluster and then try the `flink run ...` command.
 For your setup4, the reason why it works well is that it will use the
 default mini cluster to run the pyflink job. So even you haven't started a
 standalone cluster, it can work as well.

 Best,
 Biao Geng

 Levan Huyen  于2022年10月19日周三 17:07写道:

> Hi,
>
> I'm new to PyFlink, and I couldn't run a basic example that shipped
> with Flink.
> This is the command I tried:
>
> ./bin/flink run -py examples/python/datastream/word_count.py
>
> Here below are the results I got with different setups:
>
> 1. On AWS EMR 6.8.0 (Flink 1.15.1):
> *Error: No module named 'google'*I tried with the Flink shipped with
> EMR, or the binary v1.15.1/v1.15.2 downloaded from Flink site. I got that
> same error message in all cases.
>
> Traceback (most recent call last):
>
>   File "/usr/lib64/python3.7/runpy.py", line 193, in
> _run_module_as_main
>
> "__main__", mod_spec)
>
>   File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
>
> exec(code, run_globals)
>
>   File
> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
> line 134, in 
>
> word_count(known_args.input, known_args.output)
>
>   File
> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
> line 89, in word_count
>
> ds = ds.flat_map(split) \
>
>   File
> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
> line 333, in flat_map
>
>   File
> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
> line 557, in process
>
>   File
> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
> line 23, in 
>
> ModuleNotFoundError: No module named 'google'
>
> org.apache.flink.client.program.ProgramAbortException:
> java.lang.RuntimeException: Python process exits with code: 1
>
> 2. On my Mac, without a virtual environment (so `*-pyclientexec
> python3*` is included

Re: Cannot run pyflink example using Flink CLI

2022-10-20 Thread Levan Huyen
Thanks Biao.

May I ask one more question: does the binary package on Apache site (e.g:
https://archive.apache.org/dist/flink/flink-1.15.2) contain the python
package `pyflink` and its dependencies? I guess the answer is no.

Thanks and regards,
Levan Huyen

On Thu, 20 Oct 2022 at 18:13, Biao Geng  wrote:

> Hi Levan,
> Great to hear that your issue is resolved!
> For the follow-up question, I am not quite familiar with AWS EMR's
> configuration for flink but due to the error you attached, it looks like
> that pyflink may not ship some 'Google' dependencies in the Flink binary
> zip file and as a result, it will try to find it in your python
> environment. cc @hxbks...@gmail.com
> For now, to manage the complex python dependencies, the typical usage of
> pyflink in multiple node clusters for production is to create your venv and
> use it in your `flink run` command or in the python code. You can refer to
> this doc
> 
> for details.
>
> Best,
> Biao Geng
>
> Levan Huyen  于2022年10月20日周四 14:11写道:
>
>> Hi Biao,
>>
>> Thanks for your help. That solved my issue. It turned out that in setup1
>> (in EMR), I got apache-flink installed, but the package (and its
>> dependencies) are not in the directory `/usr/lib/python3.7/site-packages`
>> (corresponding to the python binary in `/usr/bin/python3`). For some
>> reason, the packages are in the current user's location (`~/.local/...)
>> which Flink did not look at.
>>
>> BTW, is there any way to use the pyflink shipped with the Flink binary
>> zip file that I downloaded from Apache's site? On EMR, such package is
>> included, and I feel it's awkward to have to install another version using
>> `pip install`. It will also be confusing about where to add the
>> dependencies jars.
>>
>> Thanks and regards,
>> Levan Huyen
>>
>>
>> On Thu, 20 Oct 2022 at 02:25, Biao Geng  wrote:
>>
>>> Hi Levan,
>>>
>>> For your setup1 & 2, it looks like the python environment is not ready.
>>> Have you tried python -m pip install apache-flink for the first 2
>>> setups?
>>> For your setup3, as you are trying to use `flink run ...` command, it
>>> will try to connect to a launched flink cluster but I guess you did not
>>> launch the flink cluster. You can do `start-cluster.sh` first to launch a
>>> standalone flink cluster and then try the `flink run ...` command.
>>> For your setup4, the reason why it works well is that it will use the
>>> default mini cluster to run the pyflink job. So even you haven't started a
>>> standalone cluster, it can work as well.
>>>
>>> Best,
>>> Biao Geng
>>>
>>> Levan Huyen  于2022年10月19日周三 17:07写道:
>>>
 Hi,

 I'm new to PyFlink, and I couldn't run a basic example that shipped
 with Flink.
 This is the command I tried:

 ./bin/flink run -py examples/python/datastream/word_count.py

 Here below are the results I got with different setups:

 1. On AWS EMR 6.8.0 (Flink 1.15.1):
 *Error: No module named 'google'*I tried with the Flink shipped with
 EMR, or the binary v1.15.1/v1.15.2 downloaded from Flink site. I got that
 same error message in all cases.

 Traceback (most recent call last):

   File "/usr/lib64/python3.7/runpy.py", line 193, in
 _run_module_as_main

 "__main__", mod_spec)

   File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code

 exec(code, run_globals)

   File
 "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
 line 134, in 

 word_count(known_args.input, known_args.output)

   File
 "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
 line 89, in word_count

 ds = ds.flat_map(split) \

   File
 "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
 line 333, in flat_map

   File
 "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
 line 557, in process

   File
 "/usr/lib/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
 line 23, in 

 ModuleNotFoundError: No module named 'google'

 org.apache.flink.client.program.ProgramAbortException:
 java.lang.RuntimeException: Python process exits with code: 1

 2. On my Mac, without a virtual environment (so `*-pyclientexec
 python3*` is included in the run command): got the same error as with
 EMR, but there's a stdout line from `*print()*` in the Python script

   File
 "/Users/lvh/dev/tmp/flink-1.15.2/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
 line 557, in process

   File "", line 259, in load_module

   File
 "/Users/lvh/dev/tmp/flink-1.15.2/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2

Re: Cannot run pyflink example using Flink CLI

2022-10-20 Thread Biao Geng
Hi Levan,
Great to hear that your issue is resolved!
For the follow-up question, I am not quite familiar with AWS EMR's
configuration for flink but due to the error you attached, it looks like
that pyflink may not ship some 'Google' dependencies in the Flink binary
zip file and as a result, it will try to find it in your python
environment. cc @hxbks...@gmail.com
For now, to manage the complex python dependencies, the typical usage of
pyflink in multiple node clusters for production is to create your venv and
use it in your `flink run` command or in the python code. You can refer to
this doc

for details.

Best,
Biao Geng

Levan Huyen  于2022年10月20日周四 14:11写道:

> Hi Biao,
>
> Thanks for your help. That solved my issue. It turned out that in setup1
> (in EMR), I got apache-flink installed, but the package (and its
> dependencies) are not in the directory `/usr/lib/python3.7/site-packages`
> (corresponding to the python binary in `/usr/bin/python3`). For some
> reason, the packages are in the current user's location (`~/.local/...)
> which Flink did not look at.
>
> BTW, is there any way to use the pyflink shipped with the Flink binary zip
> file that I downloaded from Apache's site? On EMR, such package is
> included, and I feel it's awkward to have to install another version using
> `pip install`. It will also be confusing about where to add the
> dependencies jars.
>
> Thanks and regards,
> Levan Huyen
>
>
> On Thu, 20 Oct 2022 at 02:25, Biao Geng  wrote:
>
>> Hi Levan,
>>
>> For your setup1 & 2, it looks like the python environment is not ready.
>> Have you tried python -m pip install apache-flink for the first 2 setups?
>> For your setup3, as you are trying to use `flink run ...` command, it
>> will try to connect to a launched flink cluster but I guess you did not
>> launch the flink cluster. You can do `start-cluster.sh` first to launch a
>> standalone flink cluster and then try the `flink run ...` command.
>> For your setup4, the reason why it works well is that it will use the
>> default mini cluster to run the pyflink job. So even you haven't started a
>> standalone cluster, it can work as well.
>>
>> Best,
>> Biao Geng
>>
>> Levan Huyen  于2022年10月19日周三 17:07写道:
>>
>>> Hi,
>>>
>>> I'm new to PyFlink, and I couldn't run a basic example that shipped with
>>> Flink.
>>> This is the command I tried:
>>>
>>> ./bin/flink run -py examples/python/datastream/word_count.py
>>>
>>> Here below are the results I got with different setups:
>>>
>>> 1. On AWS EMR 6.8.0 (Flink 1.15.1):
>>> *Error: No module named 'google'*I tried with the Flink shipped with
>>> EMR, or the binary v1.15.1/v1.15.2 downloaded from Flink site. I got that
>>> same error message in all cases.
>>>
>>> Traceback (most recent call last):
>>>
>>>   File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main
>>>
>>> "__main__", mod_spec)
>>>
>>>   File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
>>>
>>> exec(code, run_globals)
>>>
>>>   File
>>> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
>>> line 134, in 
>>>
>>> word_count(known_args.input, known_args.output)
>>>
>>>   File
>>> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
>>> line 89, in word_count
>>>
>>> ds = ds.flat_map(split) \
>>>
>>>   File
>>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
>>> line 333, in flat_map
>>>
>>>   File
>>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
>>> line 557, in process
>>>
>>>   File
>>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
>>> line 23, in 
>>>
>>> ModuleNotFoundError: No module named 'google'
>>>
>>> org.apache.flink.client.program.ProgramAbortException:
>>> java.lang.RuntimeException: Python process exits with code: 1
>>>
>>> 2. On my Mac, without a virtual environment (so `*-pyclientexec python3*`
>>> is included in the run command): got the same error as with EMR, but
>>> there's a stdout line from `*print()*` in the Python script
>>>
>>>   File
>>> "/Users/lvh/dev/tmp/flink-1.15.2/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
>>> line 557, in process
>>>
>>>   File "", line 259, in load_module
>>>
>>>   File
>>> "/Users/lvh/dev/tmp/flink-1.15.2/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
>>> line 23, in 
>>>
>>> ModuleNotFoundError: No module named 'google'
>>>
>>> Executing word_count example with default input data set.
>>>
>>> Use --input to specify file input.
>>>
>>> org.apache.flink.client.program.ProgramAbortException:
>>> java.lang.RuntimeException: Python process exits with code: 1
>>>
>>> 3. On my Mac, with a virtual environment and Python package `
>>> *apache-flink`* installed: Flink tried to connect to localhost:8081

Re: Cannot run pyflink example using Flink CLI

2022-10-19 Thread Levan Huyen
Hi Biao,

Thanks for your help. That solved my issue. It turned out that in setup1
(in EMR), I got apache-flink installed, but the package (and its
dependencies) are not in the directory `/usr/lib/python3.7/site-packages`
(corresponding to the python binary in `/usr/bin/python3`). For some
reason, the packages are in the current user's location (`~/.local/...)
which Flink did not look at.

BTW, is there any way to use the pyflink shipped with the Flink binary zip
file that I downloaded from Apache's site? On EMR, such package is
included, and I feel it's awkward to have to install another version using
`pip install`. It will also be confusing about where to add the
dependencies jars.

Thanks and regards,
Levan Huyen


On Thu, 20 Oct 2022 at 02:25, Biao Geng  wrote:

> Hi Levan,
>
> For your setup1 & 2, it looks like the python environment is not ready.
> Have you tried python -m pip install apache-flink for the first 2 setups?
> For your setup3, as you are trying to use `flink run ...` command, it will
> try to connect to a launched flink cluster but I guess you did not launch
> the flink cluster. You can do `start-cluster.sh` first to launch a
> standalone flink cluster and then try the `flink run ...` command.
> For your setup4, the reason why it works well is that it will use the
> default mini cluster to run the pyflink job. So even you haven't started a
> standalone cluster, it can work as well.
>
> Best,
> Biao Geng
>
> Levan Huyen  于2022年10月19日周三 17:07写道:
>
>> Hi,
>>
>> I'm new to PyFlink, and I couldn't run a basic example that shipped with
>> Flink.
>> This is the command I tried:
>>
>> ./bin/flink run -py examples/python/datastream/word_count.py
>>
>> Here below are the results I got with different setups:
>>
>> 1. On AWS EMR 6.8.0 (Flink 1.15.1):
>> *Error: No module named 'google'*I tried with the Flink shipped with
>> EMR, or the binary v1.15.1/v1.15.2 downloaded from Flink site. I got that
>> same error message in all cases.
>>
>> Traceback (most recent call last):
>>
>>   File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main
>>
>> "__main__", mod_spec)
>>
>>   File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
>>
>> exec(code, run_globals)
>>
>>   File
>> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
>> line 134, in 
>>
>> word_count(known_args.input, known_args.output)
>>
>>   File
>> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
>> line 89, in word_count
>>
>> ds = ds.flat_map(split) \
>>
>>   File
>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
>> line 333, in flat_map
>>
>>   File
>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
>> line 557, in process
>>
>>   File
>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
>> line 23, in 
>>
>> ModuleNotFoundError: No module named 'google'
>>
>> org.apache.flink.client.program.ProgramAbortException:
>> java.lang.RuntimeException: Python process exits with code: 1
>>
>> 2. On my Mac, without a virtual environment (so `*-pyclientexec python3*`
>> is included in the run command): got the same error as with EMR, but
>> there's a stdout line from `*print()*` in the Python script
>>
>>   File
>> "/Users/lvh/dev/tmp/flink-1.15.2/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
>> line 557, in process
>>
>>   File "", line 259, in load_module
>>
>>   File
>> "/Users/lvh/dev/tmp/flink-1.15.2/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
>> line 23, in 
>>
>> ModuleNotFoundError: No module named 'google'
>>
>> Executing word_count example with default input data set.
>>
>> Use --input to specify file input.
>>
>> org.apache.flink.client.program.ProgramAbortException:
>> java.lang.RuntimeException: Python process exits with code: 1
>>
>> 3. On my Mac, with a virtual environment and Python package `
>> *apache-flink`* installed: Flink tried to connect to localhost:8081 (I
>> don't know why), and failed with 'connection refused'.
>>
>> Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
>> Could not complete the operation. Number of retries has been exhausted.
>>
>> at
>> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:395)
>>
>> ... 21 more
>>
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> Connection refused: localhost/127.0.0.1:8081
>>
>>  4. If I run that same example job using Python: `*python word_count.py*`
>> then it runs well.
>>
>> I tried with both v1.15.2 and 1.15.1, Python 3.7 & 3.8, and got the same
>> result.
>>
>> Could someone please help?
>>
>> Thanks.
>>
>


Re: Cannot run pyflink example using Flink CLI

2022-10-19 Thread Biao Geng
Hi Levan,

For your setup1 & 2, it looks like the python environment is not ready.
Have you tried python -m pip install apache-flink for the first 2 setups?
For your setup3, as you are trying to use `flink run ...` command, it will
try to connect to a launched flink cluster but I guess you did not launch
the flink cluster. You can do `start-cluster.sh` first to launch a
standalone flink cluster and then try the `flink run ...` command.
For your setup4, the reason why it works well is that it will use the
default mini cluster to run the pyflink job. So even you haven't started a
standalone cluster, it can work as well.

Best,
Biao Geng

Levan Huyen  于2022年10月19日周三 17:07写道:

> Hi,
>
> I'm new to PyFlink, and I couldn't run a basic example that shipped with
> Flink.
> This is the command I tried:
>
> ./bin/flink run -py examples/python/datastream/word_count.py
>
> Here below are the results I got with different setups:
>
> 1. On AWS EMR 6.8.0 (Flink 1.15.1):
> *Error: No module named 'google'*I tried with the Flink shipped with EMR,
> or the binary v1.15.1/v1.15.2 downloaded from Flink site. I got that same
> error message in all cases.
>
> Traceback (most recent call last):
>
>   File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main
>
> "__main__", mod_spec)
>
>   File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
>
> exec(code, run_globals)
>
>   File
> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
> line 134, in 
>
> word_count(known_args.input, known_args.output)
>
>   File
> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
> line 89, in word_count
>
> ds = ds.flat_map(split) \
>
>   File
> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
> line 333, in flat_map
>
>   File
> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
> line 557, in process
>
>   File
> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
> line 23, in 
>
> ModuleNotFoundError: No module named 'google'
>
> org.apache.flink.client.program.ProgramAbortException:
> java.lang.RuntimeException: Python process exits with code: 1
>
> 2. On my Mac, without a virtual environment (so `*-pyclientexec python3*`
> is included in the run command): got the same error as with EMR, but
> there's a stdout line from `*print()*` in the Python script
>
>   File
> "/Users/lvh/dev/tmp/flink-1.15.2/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
> line 557, in process
>
>   File "", line 259, in load_module
>
>   File
> "/Users/lvh/dev/tmp/flink-1.15.2/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
> line 23, in 
>
> ModuleNotFoundError: No module named 'google'
>
> Executing word_count example with default input data set.
>
> Use --input to specify file input.
>
> org.apache.flink.client.program.ProgramAbortException:
> java.lang.RuntimeException: Python process exits with code: 1
>
> 3. On my Mac, with a virtual environment and Python package `
> *apache-flink`* installed: Flink tried to connect to localhost:8081 (I
> don't know why), and failed with 'connection refused'.
>
> Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
>
> at
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:395)
>
> ... 21 more
>
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection refused: localhost/127.0.0.1:8081
>
>  4. If I run that same example job using Python: `*python word_count.py*`
> then it runs well.
>
> I tried with both v1.15.2 and 1.15.1, Python 3.7 & 3.8, and got the same
> result.
>
> Could someone please help?
>
> Thanks.
>


Cannot run pyflink example using Flink CLI

2022-10-19 Thread Levan Huyen
Hi,

I'm new to PyFlink, and I couldn't run a basic example that shipped with
Flink.
This is the command I tried:

./bin/flink run -py examples/python/datastream/word_count.py

Here below are the results I got with different setups:

1. On AWS EMR 6.8.0 (Flink 1.15.1):
*Error: No module named 'google'*I tried with the Flink shipped with EMR,
or the binary v1.15.1/v1.15.2 downloaded from Flink site. I got that same
error message in all cases.

Traceback (most recent call last):

  File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main

"__main__", mod_spec)

  File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code

exec(code, run_globals)

  File
"/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
line 134, in 

word_count(known_args.input, known_args.output)

  File
"/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
line 89, in word_count

ds = ds.flat_map(split) \

  File
"/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
line 333, in flat_map

  File
"/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
line 557, in process

  File
"/usr/lib/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
line 23, in 

ModuleNotFoundError: No module named 'google'

org.apache.flink.client.program.ProgramAbortException:
java.lang.RuntimeException: Python process exits with code: 1

2. On my Mac, without a virtual environment (so `*-pyclientexec python3*`
is included in the run command): got the same error as with EMR, but
there's a stdout line from `*print()*` in the Python script

  File
"/Users/lvh/dev/tmp/flink-1.15.2/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
line 557, in process

  File "", line 259, in load_module

  File
"/Users/lvh/dev/tmp/flink-1.15.2/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
line 23, in 

ModuleNotFoundError: No module named 'google'

Executing word_count example with default input data set.

Use --input to specify file input.

org.apache.flink.client.program.ProgramAbortException:
java.lang.RuntimeException: Python process exits with code: 1

3. On my Mac, with a virtual environment and Python package
`*apache-flink`* installed:
Flink tried to connect to localhost:8081 (I don't know why), and failed
with 'connection refused'.

Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
Could not complete the operation. Number of retries has been exhausted.

at
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:395)

... 21 more

Caused by: java.util.concurrent.CompletionException:
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused: localhost/127.0.0.1:8081

 4. If I run that same example job using Python: `*python word_count.py*`
then it runs well.

I tried with both v1.15.2 and 1.15.1, Python 3.7 & 3.8, and got the same
result.

Could someone please help?

Thanks.


Re: Flink CLI - pass command line arguments to a pyflink job

2021-11-24 Thread Dian Fu
Thanks for sharing! Great to hear!

On Wed, Nov 24, 2021 at 3:59 AM Kamil ty  wrote:

> Thank you Matthias and Dian!
>
> I have verified this command:
> bin/flink run -py  examples/python/table/batch/word_count.py --test "Hello
> World"
> Where the "--test" argument is accessed from the python code, and the
> arguments work as expected.
>
> Best regards
> Kamil
>
> On Tue, 23 Nov 2021 at 02:48, Dian Fu  wrote:
>
>> Hi Kamil,
>>
>> It's documented at the end of the page:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#submitting-pyflink-jobs
>> .
>>
>> Regards,
>> Dian
>>
>> On Tue, Nov 23, 2021 at 12:10 AM Matthias Pohl 
>> wrote:
>>
>>> Hi Kamil,
>>> afaik, the parameter passing should work as normal by just appending
>>> them to the Flink job submission similar to the Java job submission:
>>> ```
>>> $ ./flink run --help
>>> Action "run" compiles and runs a program.
>>>   Syntax: run [OPTIONS]  
>>> [...]
>>> ```
>>>
>>> Matthias
>>>
>>> On Mon, Nov 22, 2021 at 3:58 PM Kamil ty  wrote:
>>>
>>>> Hey,
>>>>
>>>> Looking at the examples at Command-Line Interface | Apache Flink
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/>
>>>>  I
>>>> don't see an example of passing command line arguments to a pyflink job
>>>> when deploying the job to a remote cluster with flink cli. Is this
>>>> supported?
>>>>
>>>> Best Regards
>>>> Kamil
>>>>
>>>


Re: Flink CLI - pass command line arguments to a pyflink job

2021-11-23 Thread Kamil ty
Thank you Matthias and Dian!

I have verified this command:
bin/flink run -py  examples/python/table/batch/word_count.py --test "Hello
World"
Where the "--test" argument is accessed from the python code, and the
arguments work as expected.

Best regards
Kamil

On Tue, 23 Nov 2021 at 02:48, Dian Fu  wrote:

> Hi Kamil,
>
> It's documented at the end of the page:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#submitting-pyflink-jobs
> .
>
> Regards,
> Dian
>
> On Tue, Nov 23, 2021 at 12:10 AM Matthias Pohl 
> wrote:
>
>> Hi Kamil,
>> afaik, the parameter passing should work as normal by just appending them
>> to the Flink job submission similar to the Java job submission:
>> ```
>> $ ./flink run --help
>> Action "run" compiles and runs a program.
>>   Syntax: run [OPTIONS]  
>> [...]
>> ```
>>
>> Matthias
>>
>> On Mon, Nov 22, 2021 at 3:58 PM Kamil ty  wrote:
>>
>>> Hey,
>>>
>>> Looking at the examples at Command-Line Interface | Apache Flink
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/>
>>>  I
>>> don't see an example of passing command line arguments to a pyflink job
>>> when deploying the job to a remote cluster with flink cli. Is this
>>> supported?
>>>
>>> Best Regards
>>> Kamil
>>>
>>


Re: Flink CLI - pass command line arguments to a pyflink job

2021-11-22 Thread Dian Fu
Hi Kamil,

It's documented at the end of the page:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#submitting-pyflink-jobs
.

Regards,
Dian

On Tue, Nov 23, 2021 at 12:10 AM Matthias Pohl 
wrote:

> Hi Kamil,
> afaik, the parameter passing should work as normal by just appending them
> to the Flink job submission similar to the Java job submission:
> ```
> $ ./flink run --help
> Action "run" compiles and runs a program.
>   Syntax: run [OPTIONS]  
> [...]
> ```
>
> Matthias
>
> On Mon, Nov 22, 2021 at 3:58 PM Kamil ty  wrote:
>
>> Hey,
>>
>> Looking at the examples at Command-Line Interface | Apache Flink
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/>
>>  I
>> don't see an example of passing command line arguments to a pyflink job
>> when deploying the job to a remote cluster with flink cli. Is this
>> supported?
>>
>> Best Regards
>> Kamil
>>
>


Re: Flink CLI - pass command line arguments to a pyflink job

2021-11-22 Thread Matthias Pohl
Hi Kamil,
afaik, the parameter passing should work as normal by just appending them
to the Flink job submission similar to the Java job submission:
```
$ ./flink run --help
Action "run" compiles and runs a program.
  Syntax: run [OPTIONS]  
[...]
```

Matthias

On Mon, Nov 22, 2021 at 3:58 PM Kamil ty  wrote:

> Hey,
>
> Looking at the examples at Command-Line Interface | Apache Flink
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/>
>  I
> don't see an example of passing command line arguments to a pyflink job
> when deploying the job to a remote cluster with flink cli. Is this
> supported?
>
> Best Regards
> Kamil
>


Flink CLI - pass command line arguments to a pyflink job

2021-11-22 Thread Kamil ty
Hey,

Looking at the examples at Command-Line Interface | Apache Flink
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/>
I
don't see an example of passing command line arguments to a pyflink job
when deploying the job to a remote cluster with flink cli. Is this
supported?

Best Regards
Kamil


Re: Using the flink CLI option --pyRequirements

2021-10-20 Thread Dian Fu
Hi Francis Conroy,

Do you want to debug the PyFlink job submitted via `flink run`? There is
documentation [1] on how to debug it via `PyCharm`.

PS: It supports the loopback mode in PyFlink which is enabled in local
deployment. That's when you execute the PyFlink jobs locally, e.g. when
executing it directly in IDE. With the loopback mode, you could simply set
a breakpoint in the source code in the IDE. More information could refer to
[2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/debugging/#remote-debug
[2]
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#loopback-mode-for-debugging

On Tue, Oct 19, 2021 at 1:34 PM Francis Conroy 
wrote:

> Hi,
>
> I'm trying to install some required modules by supplying a requirements
> file when submitting to the cluster and the CLI just seems to stall. I've
> built 1.15-SNAPSHOT@7578758fa8c84314b8b3206629b3afa9ff41b636 and have run
> the wordcount example, everything else seems to work, I just can't submit a
> pyflink job to my cluster when using the --pyRequirements option.
>
> I started going down the line of debugging the flink CLI using intellij
> idea, but wasn't able to figure out how to make my venv with pyflink
> installed available to the debug environment.
>
> Thanks,
> Francis Conroy
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


Using the flink CLI option --pyRequirements

2021-10-18 Thread Francis Conroy
 Hi,

I'm trying to install some required modules by supplying a requirements
file when submitting to the cluster and the CLI just seems to stall. I've
built 1.15-SNAPSHOT@7578758fa8c84314b8b3206629b3afa9ff41b636 and have run
the wordcount example, everything else seems to work, I just can't submit a
pyflink job to my cluster when using the --pyRequirements option.

I started going down the line of debugging the flink CLI using intellij
idea, but wasn't able to figure out how to make my venv with pyflink
installed available to the debug environment.

Thanks,
Francis Conroy

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-04-06 Thread Fuyao Li
Hi Yang,

Thanks for the reply, those information is very helpful.

Best,
Fuyao

From: Yang Wang 
Date: Tuesday, April 6, 2021 at 01:11
To: Fuyao Li 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Sorry for the late reply.

It is not very hard to develop your own deployer. Actually, I have 3 days for 
developing the PoC version of flink-native-k8s-operator. So
if you want to have a fully functional K8s operator, maybe two weeks is enough. 
But if you want to put it into production, you may need
some more time to polish it for easier use.

Flink native K8s integration is not going to replace the standalone mode. 
First, not all the Flink standalone clusters are running on the K8s.
And standalone mode could work really well with reactive mode[1].


Flink native K8s integration is not going to replace the K8s operator. 
Actually, the Flink K8s operator is not on the same level of Flink native
integration. The Flink k8s operator is responsible for managing the lifecycle 
of Flink application. Also it is to make the submission more K8s style.
The google and lyft Flink k8s operator could support native mode. They just do 
not have the support right now.


Kubernetes HA could work both for standalone mode and native mode. You could 
find the configuration here[2]. However, you might
need some changes on the Flink k8s operator to make it work. Because we need to 
add more args(e.g. --host) to the JobManager start commands.


[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-159*3A*Reactive*Mode__;JSsr!!GqivPVa7Brio!LlA2Z6MFKqpH1XysG69hwElbXv4nC7ZsP5YFYscbPpNQVoXSn_r_39Hr_K1mDh0$>
[2]. 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/*high-availability-with-standalone-kubernetes__;Iw!!GqivPVa7Brio!LlA2Z6MFKqpH1XysG69hwElbXv4nC7ZsP5YFYscbPpNQVoXSn_r_39Hr5zvpqB4$>

Best,
Yang


Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月5日周一 
下午1:33写道:
Hello Yang,

I am just following up the previous email to see if you got some time to reply.
I also took a deeper look into lyft k8s operator recently. It seems it doesn’t 
support HA natively. It still needs the help of ZooKeeper. In terms of this, 
native k8s is better. Any other ideas? Thanks for your help.

Best,
Fuyao

From: Fuyao Li mailto:fuyao...@oracle.com>>
Date: Thursday, April 1, 2021 at 12:22
To: Yang Wang mailto:danrtsey...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Yang,

Thanks for sharing the insights.

For problem 1:
I think I can’t do telnet in the container. I tried to use curl 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!LlA2Z6MFKqpH1XysG69hwElbXv4nC7ZsP5YFYscbPpNQVoXSn_r_39HrBFdpzbY$>
 and I could see the HTML of Flink dashboard UI. This proves such public IP is 
reachable inside the cluster. Just as you mentioned, there might still be some 
network issues with the cluster. I will do some further check.

For problem 2:
I created a new K8S cluster with bastion server with some public IP assigned to 
it. Finally, I can see something valid from my browser. (There still exist some 
problems with connecting to some databases, but I think these network problems 
are not directly related to Flink, I can investigate into it later.)

For problem 3:
Thanks for sharing the repo you created. I am not sure how much work it could 
take to develop a deployer. I understand is depends on the proficiency, could 
you give a rough estimation? If it is too complicated and some other options 
are not significantly inferior to native Kubernetes. I might prefer to choose 
other options. I am currently comparing different options to deploy in 
Kubernetes.

  1.  Standalone K8S
  2.  Native Kubernetes
  3.  Flink operator (Google Cloud Platform/ Lyft) [1][2]

I also watched the demo video you presented. [3] I noticed you mentioned that 
native K8S is not going to replace the other two options. I still doesn’t fully 
get your idea with limited explanation in the demo. Could you compare the 
tradeoff a little bit? Thanks!
[1] 
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator<https://urldefense.com/v3/__https:/github.com/GoogleCloudPlatform/flink-on-k8s-operator__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sY_v6kdJg$>
[2]  
https://github.com/lyft/flinkk8soperator<https://urldefense.com/v3/__https:/github.com/lyft/flinkk8soperator__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sYbe30Oc8$>
[3] 

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-04-06 Thread Yang Wang
Hi Fuyao,

Sorry for the late reply.

It is not very hard to develop your own deployer. Actually, I have 3 days
for developing the PoC version of flink-native-k8s-operator. So
if you want to have a fully functional K8s operator, maybe two weeks is
enough. But if you want to put it into production, you may need
some more time to polish it for easier use.

Flink native K8s integration is not going to replace the standalone mode.
First, not all the Flink standalone clusters are running on the K8s.
And standalone mode could work really well with reactive mode[1].


Flink native K8s integration is not going to replace the K8s operator.
Actually, the Flink K8s operator is not on the same level of Flink native
integration. The Flink k8s operator is responsible for managing the
lifecycle of Flink application. Also it is to make the submission more K8s
style.
The google and lyft Flink k8s operator could support native mode. They just
do not have the support right now.


Kubernetes HA could work both for standalone mode and native mode. You
could find the configuration here[2]. However, you might
need some changes on the Flink k8s operator to make it work. Because we
need to add more args(e.g. --host) to the JobManager start commands.


[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
[2].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes

Best,
Yang


Fuyao Li  于2021年4月5日周一 下午1:33写道:

> Hello Yang,
>
>
>
> I am just following up the previous email to see if you got some time to
> reply.
>
> I also took a deeper look into lyft k8s operator recently. It seems it
> doesn’t support HA natively. It still needs the help of ZooKeeper. In terms
> of this, native k8s is better. Any other ideas? Thanks for your help.
>
>
>
> Best,
>
> Fuyao
>
>
>
> *From: *Fuyao Li 
> *Date: *Thursday, April 1, 2021 at 12:22
> *To: *Yang Wang 
> *Cc: *user 
> *Subject: *Re: [External] : Re: Need help with executing Flink CLI for
> native Kubernetes deployment
>
> Hi Yang,
>
>
>
> Thanks for sharing the insights.
>
>
>
> For problem 1:
>
> I think I can’t do telnet in the container. I tried to use curl
> 144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This
> proves such public IP is reachable inside the cluster. Just as you
> mentioned, there might still be some network issues with the cluster. I
> will do some further check.
>
>
>
> For problem 2:
>
> I created a new K8S cluster with bastion server with some public IP
> assigned to it. Finally, I can see something valid from my browser. (There
> still exist some problems with connecting to some databases, but I think
> these network problems are not directly related to Flink, I can investigate
> into it later.)
>
>
>
> For problem 3:
>
> Thanks for sharing the repo you created. I am not sure how much work it
> could take to develop a deployer. I understand is depends on the
> proficiency, could you give a rough estimation? If it is too complicated
> and some other options are not significantly inferior to native Kubernetes.
> I might prefer to choose other options. I am currently comparing different
> options to deploy in Kubernetes.
>
>1. Standalone K8S
>2. Native Kubernetes
>3. Flink operator (Google Cloud Platform/ Lyft) [1][2]
>
>
>
> I also watched the demo video you presented. [3] I noticed you mentioned
> that native K8S is not going to replace the other two options. I still
> doesn’t fully get your idea with limited explanation in the demo. Could you
> compare the tradeoff a little bit? Thanks!
>
> [1] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> <https://urldefense.com/v3/__https:/github.com/GoogleCloudPlatform/flink-on-k8s-operator__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sY_v6kdJg$>
>
> [2]  https://github.com/lyft/flinkk8soperator
> <https://urldefense.com/v3/__https:/github.com/lyft/flinkk8soperator__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sYbe30Oc8$>
>
> [3] https://youtu.be/pdFPr_VOWTU
> <https://urldefense.com/v3/__https:/youtu.be/pdFPr_VOWTU__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sY-or9EkA$>
>
>
>
> Best,
>
> Fuyao
>
>
>
>
>
> *From: *Yang Wang 
> *Date: *Tuesday, March 30, 2021 at 19:15
> *To: *Fuyao Li 
> *Cc: *user 
> *Subject: *Re: [External] : Re: Need help with executing Flink CLI for
> native Kubernetes deployment
>
> Hi Fuyao,
>
>
>
> Thanks for sharing the progress.
>
>
>
> 1. The flink client is able to list/cancel jobs, based on logs shared
> 

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-04-04 Thread Fuyao Li
Hello Yang,

I am just following up the previous email to see if you got some time to reply.
I also took a deeper look into lyft k8s operator recently. It seems it doesn’t 
support HA natively. It still needs the help of ZooKeeper. In terms of this, 
native k8s is better. Any other ideas? Thanks for your help.

Best,
Fuyao

From: Fuyao Li 
Date: Thursday, April 1, 2021 at 12:22
To: Yang Wang 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Yang,

Thanks for sharing the insights.

For problem 1:
I think I can’t do telnet in the container. I tried to use curl 
144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves 
such public IP is reachable inside the cluster. Just as you mentioned, there 
might still be some network issues with the cluster. I will do some further 
check.

For problem 2:
I created a new K8S cluster with bastion server with some public IP assigned to 
it. Finally, I can see something valid from my browser. (There still exist some 
problems with connecting to some databases, but I think these network problems 
are not directly related to Flink, I can investigate into it later.)

For problem 3:
Thanks for sharing the repo you created. I am not sure how much work it could 
take to develop a deployer. I understand is depends on the proficiency, could 
you give a rough estimation? If it is too complicated and some other options 
are not significantly inferior to native Kubernetes. I might prefer to choose 
other options. I am currently comparing different options to deploy in 
Kubernetes.

  1.  Standalone K8S
  2.  Native Kubernetes
  3.  Flink operator (Google Cloud Platform/ Lyft) [1][2]

I also watched the demo video you presented. [3] I noticed you mentioned that 
native K8S is not going to replace the other two options. I still doesn’t fully 
get your idea with limited explanation in the demo. Could you compare the 
tradeoff a little bit? Thanks!
[1] 
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator<https://urldefense.com/v3/__https:/github.com/GoogleCloudPlatform/flink-on-k8s-operator__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sY_v6kdJg$>
[2]  
https://github.com/lyft/flinkk8soperator<https://urldefense.com/v3/__https:/github.com/lyft/flinkk8soperator__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sYbe30Oc8$>
[3] 
https://youtu.be/pdFPr_VOWTU<https://urldefense.com/v3/__https:/youtu.be/pdFPr_VOWTU__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sY-or9EkA$>

Best,
Fuyao


From: Yang Wang 
Date: Tuesday, March 30, 2021 at 19:15
To: Fuyao Li 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Thanks for sharing the progress.

1. The flink client is able to list/cancel jobs, based on logs shared above, I 
should be able to ping 144.25.13.78, why I still can NOT ping such address?

I think this is a environment problem. Actually, not every IP address could be 
tested with "ping" command. I suggest you to use "telnet 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>"
 to check the network connectivity.

2. Why is 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>
 not accessible from outside, I mean on my laptop’s browser. I am within the 
company’s VPN and such public load balancer should expose the flink Web UI, 
right? I tried to debug the network configuration, but failed to find a reason, 
could you give me some hints?

Just like my above answer, I think you need to check the network connectivity 
via "telnet 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>".
 Maybe because the firewall is not allowed connecting from your local(e.g. your 
local ip is not in the white list of LoadBalancer IP).

In production, what is the suggested approach to list and cancel jobs? The 
current manual work of “kubectl exec” into pods is not very reliable.. How to 
automate this process and integrate this CI/CD? Please share some blogs there 
is any, thanks.

I think in production environment, you should have your own deployer, which 
will take care of submitting the jobs, list/cancel the jobs. Even the deployer 
could help with triggering savepoint and manage the whole lifecycle of Flink 
applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could 
be a start point of your own deployer if you want to develop it in JAVA.

[1]. 
https://github.com/wangyang0918/flink-native-k8s-operator<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-04-01 Thread Fuyao Li
Hi Yang,

Thanks for sharing the insights.

For problem 1:
I think I can’t do telnet in the container. I tried to use curl 
144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves 
such public IP is reachable inside the cluster. Just as you mentioned, there 
might still be some network issues with the cluster. I will do some further 
check.

For problem 2:
I created a new K8S cluster with bastion server with some public IP assigned to 
it. Finally, I can see something valid from my browser. (There still exist some 
problems with connecting to some databases, but I think these network problems 
are not directly related to Flink, I can investigate into it later.)

For problem 3:
Thanks for sharing the repo you created. I am not sure how much work it could 
take to develop a deployer. I understand is depends on the proficiency, could 
you give a rough estimation? If it is too complicated and some other options 
are not significantly inferior to native Kubernetes. I might prefer to choose 
other options. I am currently comparing different options to deploy in 
Kubernetes.

  1.  Standalone K8S
  2.  Native Kubernetes
  3.  Flink operator (Google Cloud Platform/ Lyft) [1][2]

I also watched the demo video you presented. [3] I noticed you mentioned that 
native K8S is not going to replace the other two options. I still doesn’t fully 
get your idea with limited explanation in the demo. Could you compare the 
tradeoff a little bit? Thanks!
[1] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
[2]  https://github.com/lyft/flinkk8soperator
[3] https://youtu.be/pdFPr_VOWTU

Best,
Fuyao


From: Yang Wang 
Date: Tuesday, March 30, 2021 at 19:15
To: Fuyao Li 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Thanks for sharing the progress.

1. The flink client is able to list/cancel jobs, based on logs shared above, I 
should be able to ping 144.25.13.78, why I still can NOT ping such address?

I think this is a environment problem. Actually, not every IP address could be 
tested with "ping" command. I suggest you to use "telnet 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>"
 to check the network connectivity.

2. Why is 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>
 not accessible from outside, I mean on my laptop’s browser. I am within the 
company’s VPN and such public load balancer should expose the flink Web UI, 
right? I tried to debug the network configuration, but failed to find a reason, 
could you give me some hints?

Just like my above answer, I think you need to check the network connectivity 
via "telnet 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>".
 Maybe because the firewall is not allowed connecting from your local(e.g. your 
local ip is not in the white list of LoadBalancer IP).

In production, what is the suggested approach to list and cancel jobs? The 
current manual work of “kubectl exec” into pods is not very reliable.. How to 
automate this process and integrate this CI/CD? Please share some blogs there 
is any, thanks.

I think in production environment, you should have your own deployer, which 
will take care of submitting the jobs, list/cancel the jobs. Even the deployer 
could help with triggering savepoint and manage the whole lifecycle of Flink 
applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could 
be a start point of your own deployer if you want to develop it in JAVA.

[1]. 
https://github.com/wangyang0918/flink-native-k8s-operator<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJKkKXY-w$>


Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年3月31日周三 
上午6:37写道:
Hello Yang,

Thank you so much for providing me the flink-client.yaml. I was able to make 
some progress. I didn’t realize I should create an new pod flink-client to 
list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe 
that is the reason why it doesn’t work. However, I still have several questions.

I created the deployment based on your flink-client.yaml
For the LoadBalancer mode:

After apply the cluster role binding yaml below.

# 
https://kubernetes.io/docs/reference/access-authn-authz/rbac/<https://urldefense.com/v3/__https:/kubernetes.io/docs/reference/access-authn-authz/rbac/__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJOLipbis$>
# 
https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-servi

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-03-30 Thread Yang Wang
N and such public load
>balancer should expose the flink Web UI, right? I tried to debug the
>network configuration, but failed to find a reason, could you give me some
>hints?
>3. In production, what is the suggested approach to list and cancel
>jobs? The current manual work of “kubectl exec” into pods is not very
>reliable.. How to automate this process and integrate this CI/CD? Please
>share some blogs there is any, thanks.
>
>
>
>
>
> Best,
>
> Fuyao
>
>
>
> *From: *Yang Wang 
> *Date: *Monday, March 29, 2021 at 20:40
> *To: *Fuyao Li 
> *Cc: *user 
> *Subject: *[External] : Re: Need help with executing Flink CLI for native
> Kubernetes deployment
>
> Hi Fuyao,
>
>
>
> Thanks for trying the native Kubernetes integration.
>
>
>
> Just like you know, the Flink rest service could be exposed in following
> three types, configured via "kubernetes.rest-service.exposed.type".
>
>
>
> * ClusterIP, which means you could only access the Flink rest endpoint
> inside the K8s cluster. Simply, users could start a Flink client in the
>
> K8s cluster via the following yaml file. And use "kubectl exec" to tunnel
> in the pod to create a Flink session/application cluster. Also the
>
> "flink list/cancel" could work well.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *apiVersion: apps/v1 kind: Deployment metadata:   name: flink-client spec:
>   replicas: 1   selector: matchLabels:   app: flink-client
> template: metadata:   labels: app: flink-client spec:
> containers:   - name: client image: flink:1.12.2
> imagePullPolicy: Always args: ["sleep", "86400"]*
>
>
>
> * NodePort
>
> Currently, we have a limitation that only the Kubernetes master nodes
> could be used to build the Flink exposed rest endpoint. So if your
>
> APIServer node does not have the kube proxy, then the printed URL in the
> Flink client logs could not be used. We already have a ticket[1] to
>
> support one of the slave nodes for accessing the rest endpoint. But I have
> not managed myself to get it done.
>
>
>
> * LoadBalancer
>
> Is the resolved rest endpoint "http://144.25.13.78:8081/
> <https://urldefense.com/v3/__http:/144.25.13.78:8081/__;!!GqivPVa7Brio!MEg0isX5VoPxvAeBA5KGLMydlfMhTvjVoI-5fjvprud4hyKk4cnhRZaL6Tas5bs$>"
> accessible on your Flink client side? If it is yes, then I think the Flink
> client
>
> should be able to contact to JobManager rest server to list/cancel the
> jobs. I have verified in Alibaba container service, and it works well.
>
>
>
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-16601
> <https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/FLINK-16601__;!!GqivPVa7Brio!MEg0isX5VoPxvAeBA5KGLMydlfMhTvjVoI-5fjvprud4hyKk4cnhRZaLq6qn8eI$>
>
>
>
>
>
> Best,
>
> Yang
>
>
>
> Fuyao Li  于2021年3月27日周六 上午5:59写道:
>
> Hi Community, Yang,
>
>
>
> I am new to Flink on native Kubernetes and I am trying to do a POC for
> native Kubernetes application mode on Oracle Cloud Infrastructure. I was
> following the documentation here step by step: [1]
>
>
>
> I am using Flink 1.12.1, Scala 2.11, java 11.
>
> I was able to create a native Kubernetes Deployment, but I am not able to
> use any further commands like list / cancel etc.. I always run into timeout
> error. I think the issue could be the JobManager Web Interface IP address
> printed after job deployment is not accessible. This issue is causing me
> not able to shut down the deployment with a savepoint. It could be
> Kubernetes configuration issue. I have exposed all related ports traffic
> and validated the security list, but still couldn’t make it work. Any help
> is appreciated.
>
>
>
>
>
> The relevant Flink source code is CliFrontend.java class [2]
>
> The ./bin/flink list and cancel command is trying to send traffic to the
> Flink dashboard UI IP address and it gets timeout. I tried to both
> LoadBalancer and NodePort option for
> -Dkubernetes.rest-service.exposed.type configuration. Both of them
> doesn’t work.
>
>
>
> # List running job on the cluster (I can’t execute this command
> successfully due to timeout, logs shared below)
>
> $ ./bin/flink list --target kubernetes-application
> -Dkubernetes.cluster-id=my-first-application-cluster
>
> # Cancel running job (I can’t execute this command succcessfully)
>
> $ ./bin/flink cancel --target kubernetes-application
> -Dkubernetes.cluster-id=my-first-application-cluster 
>
>
>
> I think those com

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-03-30 Thread Fuyao Li
Hello Yang,

Thank you so much for providing me the flink-client.yaml. I was able to make 
some progress. I didn’t realize I should create an new pod flink-client to 
list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe 
that is the reason why it doesn’t work. However, I still have several questions.

I created the deployment based on your flink-client.yaml
For the LoadBalancer mode:

After apply the cluster role binding yaml below.

# https://kubernetes.io/docs/reference/access-authn-authz/rbac/
# 
https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  namespace: default
  name: service-reader
rules:
- apiGroups: [""] # "" indicates the core API group
  resources: ["services"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

And execute the command:
kubectl create clusterrolebinding service-reader-pod  
--clusterrole=service-reader  --serviceaccount=default:default

I am able to exec in the flink-client pod and list/cancel jobs.

$ kubectl exec -it flink-client-776886cf4f-9h47f bash
kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future 
version. Use kubectl exec [POD] -- [COMMAND] instead.
root@flink-client-776886cf4f-9h47f:/opt/flink# ./bin/flink list --target 
kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
2021-03-30 21:53:14,513 INFO  
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
flink cluster my-first-application-cluster successfully, JobManager Web 
Interface: http://144.25.13.78:8081
Waiting for response...
-- Running/Restarting Jobs ---
24.03.2021 00:13:04 : eea39629a1931b67eb395207739455ce : Flink Streaming Java 
API Skeleton (RUNNING)
--
No scheduled jobs.
root@flink-client-776886cf4f-9h47f:/opt/flink# ping 144.25.13.78
PING 144.25.13.78 (144.25.13.78) 56(84) bytes of data.

^C
--- 144.25.13.78 ping statistics ---
31 packets transmitted, 0 received, 100% packet loss, time 772ms

Question:

  1.  The flink client is able to list/cancel jobs, based on logs shared above, 
I should be able to ping 144.25.13.78, why I still can NOT ping such address?
  2.  Why is 144.25.13.78:8081 not accessible from outside, I mean on my 
laptop’s browser. I am within the company’s VPN and such public load balancer 
should expose the flink Web UI, right? I tried to debug the network 
configuration, but failed to find a reason, could you give me some hints?
  3.  In production, what is the suggested approach to list and cancel jobs? 
The current manual work of “kubectl exec” into pods is not very reliable.. How 
to automate this process and integrate this CI/CD? Please share some blogs 
there is any, thanks.


Best,
Fuyao

From: Yang Wang 
Date: Monday, March 29, 2021 at 20:40
To: Fuyao Li 
Cc: user 
Subject: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Thanks for trying the native Kubernetes integration.

Just like you know, the Flink rest service could be exposed in following three 
types, configured via "kubernetes.rest-service.exposed.type".

* ClusterIP, which means you could only access the Flink rest endpoint inside 
the K8s cluster. Simply, users could start a Flink client in the
K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in 
the pod to create a Flink session/application cluster. Also the
"flink list/cancel" could work well.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-client
spec:
  replicas: 1
  selector:
matchLabels:
  app: flink-client
  template:
metadata:
  labels:
app: flink-client
spec:
  containers:
  - name: client
image: flink:1.12.2
imagePullPolicy: Always
args: ["sleep", "86400"]

* NodePort
Currently, we have a limitation that only the Kubernetes master nodes could be 
used to build the Flink exposed rest endpoint. So if your
APIServer node does not have the kube proxy, then the printed URL in the Flink 
client logs could not be used. We already have a ticket[1] to
support one of the slave nodes for accessing the rest endpoint. But I have not 
managed myself to get it done.

* LoadBalancer
Is the resolved rest endpoint 
"http://144.25.13.78:8081/<https://urldefense.com/v3/__http:/144.25.13.78:8081/__;!!GqivPVa7Brio!MEg0isX5VoPxvAeBA5KGLMydlfMhTvjVoI-5fjvprud4hyKk4cnhRZaL6Tas5bs$>"
 accessible on your Flink client side? If it is yes, then I think the Flink 
client
should be able to contact to JobManager rest server to list/cancel the jobs. I 
have verified in Alibaba container service, and it works well.


[1]. 
https://issu

Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-03-29 Thread Yang Wang
Hi Fuyao,

Thanks for trying the native Kubernetes integration.

Just like you know, the Flink rest service could be exposed in following
three types, configured via "kubernetes.rest-service.exposed.type".

* ClusterIP, which means you could only access the Flink rest endpoint
inside the K8s cluster. Simply, users could start a Flink client in the
K8s cluster via the following yaml file. And use "kubectl exec" to tunnel
in the pod to create a Flink session/application cluster. Also the
"flink list/cancel" could work well.



















*apiVersion: apps/v1kind: Deploymentmetadata:  name: flink-clientspec:
replicas: 1  selector:matchLabels:  app: flink-client  template:
metadata:  labels:app: flink-clientspec:  containers:
- name: clientimage: flink:1.12.2imagePullPolicy:
Alwaysargs: ["sleep", "86400"]*

* NodePort
Currently, we have a limitation that only the Kubernetes master nodes could
be used to build the Flink exposed rest endpoint. So if your
APIServer node does not have the kube proxy, then the printed URL in the
Flink client logs could not be used. We already have a ticket[1] to
support one of the slave nodes for accessing the rest endpoint. But I have
not managed myself to get it done.

* LoadBalancer
Is the resolved rest endpoint "http://144.25.13.78:8081/"; accessible on
your Flink client side? If it is yes, then I think the Flink client
should be able to contact to JobManager rest server to list/cancel the
jobs. I have verified in Alibaba container service, and it works well.


[1]. https://issues.apache.org/jira/browse/FLINK-16601


Best,
Yang

Fuyao Li  于2021年3月27日周六 上午5:59写道:

> Hi Community, Yang,
>
>
>
> I am new to Flink on native Kubernetes and I am trying to do a POC for
> native Kubernetes application mode on Oracle Cloud Infrastructure. I was
> following the documentation here step by step: [1]
>
>
>
> I am using Flink 1.12.1, Scala 2.11, java 11.
>
> I was able to create a native Kubernetes Deployment, but I am not able to
> use any further commands like list / cancel etc.. I always run into timeout
> error. I think the issue could be the JobManager Web Interface IP address
> printed after job deployment is not accessible. This issue is causing me
> not able to shut down the deployment with a savepoint. It could be
> Kubernetes configuration issue. I have exposed all related ports traffic
> and validated the security list, but still couldn’t make it work. Any help
> is appreciated.
>
>
>
>
>
> The relevant Flink source code is CliFrontend.java class [2]
>
> The ./bin/flink list and cancel command is trying to send traffic to the
> Flink dashboard UI IP address and it gets timeout. I tried to both
> LoadBalancer and NodePort option for
> -Dkubernetes.rest-service.exposed.type configuration. Both of them
> doesn’t work.
>
>
>
> # List running job on the cluster (I can’t execute this command
> successfully due to timeout, logs shared below)
>
> $ ./bin/flink list --target kubernetes-application
> -Dkubernetes.cluster-id=my-first-application-cluster
>
> # Cancel running job (I can’t execute this command succcessfully)
>
> $ ./bin/flink cancel --target kubernetes-application
> -Dkubernetes.cluster-id=my-first-application-cluster 
>
>
>
> I think those commands needs to communicate with the endpoint that shows
> after the the job submission command.
>
>
>
>1. Use case 1(deploy with NodePort)
>
>
>
> # fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127
>
> $ ./bin/flink run-application \
>
> --target kubernetes-application \
>
> -Dkubernetes.cluster-id=my-first-application-cluster \
>
> -Dkubernetes.container.image=
> us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \
>
> -Dkubernetes.container.image.pull-policy=IfNotPresent \
>
> -Dkubernetes.container.image.pull-secrets=ocirsecret \
>
> -Dkubernetes.rest-service.exposed.type=NodePort \
>
> -Dkubernetes.service-account=flink-service-account \
>
> local:///opt/flink/usrlib/quickstart-0.1.jar
>
>
>
>
>
> When the expose type is NodePort, the printed messages says the the Flink
> JobManager Web Interface:is at http://192.29.104.156:30996
> 192.29.104.156 is my Kubernetes apiserver address. 30996 is the port that
> exposes the service. However, Flink dashboard in this address is not
> resolvable.
>
> I can only get access to dashboard UI on each node IP address(There are
> three nodes in my K8S cluster)
>
> 100.104.154.73:30996
>
> 100.104.154.74:30996
>
> 100.104.154.75:30996
>
>   I got the following errors when trying to do list command for such a
> native Kubernetes deployment. See in [4]. *According to the documentation
> here [3], this shouldn’t happen since Kubernetes api server address should
> also have the Flink Web UI… Did I miss any configurations in Kubernetes to
> make webUI available in Kubernetes apiserver address?*
>
>
>
>
>
>1. Use case 2 (deploy with LoadBalancer)
>
> # fuyli @ fuyli-mac in ~/Development/flin

Need help with executing Flink CLI for native Kubernetes deployment

2021-03-26 Thread Fuyao Li
Hi Community, Yang,

I am new to Flink on native Kubernetes and I am trying to do a POC for native 
Kubernetes application mode on Oracle Cloud Infrastructure. I was following the 
documentation here step by step: [1]

I am using Flink 1.12.1, Scala 2.11, java 11.
I was able to create a native Kubernetes Deployment, but I am not able to use 
any further commands like list / cancel etc.. I always run into timeout error. 
I think the issue could be the JobManager Web Interface IP address printed 
after job deployment is not accessible. This issue is causing me not able to 
shut down the deployment with a savepoint. It could be Kubernetes configuration 
issue. I have exposed all related ports traffic and validated the security 
list, but still couldn’t make it work. Any help is appreciated.


The relevant Flink source code is CliFrontend.java class [2]
The ./bin/flink list and cancel command is trying to send traffic to the Flink 
dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and 
NodePort option for -Dkubernetes.rest-service.exposed.type configuration. Both 
of them doesn’t work.

# List running job on the cluster (I can’t execute this command successfully 
due to timeout, logs shared below)
$ ./bin/flink list --target kubernetes-application 
-Dkubernetes.cluster-id=my-first-application-cluster
# Cancel running job (I can’t execute this command succcessfully)
$ ./bin/flink cancel --target kubernetes-application 
-Dkubernetes.cluster-id=my-first-application-cluster 

I think those commands needs to communicate with the endpoint that shows after 
the the job submission command.


  1.  Use case 1(deploy with NodePort)

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \

-Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1
 \
-Dkubernetes.container.image.pull-policy=IfNotPresent \
-Dkubernetes.container.image.pull-secrets=ocirsecret \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.service-account=flink-service-account \
local:///opt/flink/usrlib/quickstart-0.1.jar


When the expose type is NodePort, the printed messages says the the Flink  
JobManager Web Interface:is at http://192.29.104.156:30996  192.29.104.156 is 
my Kubernetes apiserver address. 30996 is the port that exposes the service. 
However, Flink dashboard in this address is not resolvable.
I can only get access to dashboard UI on each node IP address(There are three 
nodes in my K8S cluster)
100.104.154.73:30996
100.104.154.74:30996
100.104.154.75:30996
  I got the following errors when trying to do list command for such a 
native Kubernetes deployment. See in [4]. According to the documentation here 
[3], this shouldn’t happen since Kubernetes api server address should also have 
the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI 
available in Kubernetes apiserver address?



  1.  Use case 2 (deploy with LoadBalancer)
# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \

-Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1
 \
-Dkubernetes.container.image.pull-policy=IfNotPresent \
-Dkubernetes.container.image.pull-secrets=ocirsecret \
-Dkubernetes.rest-service.exposed.type=LoadBalancer \
-Dkubernetes.service-account=flink-service-account \
local:///opt/flink/usrlib/quickstart-0.1.jar


After a while, when the external IP is resolved. It said Flink JobManager web 
interface is at the external-IP (LOAD BALANCER address) at: 
http://144.25.13.78:8081
When I execute the list command, I still got error after waiting for long time 
to let it get timeout. See errors here. [5]

I can still get access to NodeIP:. In such case, I tend to 
believe it is a network issue. But still quite confused since I am already open 
all the traffics..




Reference:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[2] 
https://github.com/apache/flink/blob/f3155e6c0213de7bf4b58a89fb1e1331dee7701a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#accessing-flinks-web-ui
[4] https://pastebin.ubuntu.com/p/WcJMwds52r/
[5] https://pastebin.ubuntu.com/p/m27BnQGXQc/


Thanks for your help in advance.

Best regards,
Fuyao




Re: Flink cli to upload flink jar and run flink jobs in the Jenkins pipeline

2021-02-03 Thread sidhant gupta
Thanks Yang for your help.

On Thu, Feb 4, 2021, 8:28 AM Yang Wang  wrote:

> Yes, if you are using the CLI(e.g. flink run/list/cancel -t yarn-session
> ...) for the job management,
> it will eventually call the RestClusterClient, which could retrieve the
> leader JobManager address from ZK.
>
> Please ensure that you have specified the HA related config options in CLI
> via -D or set them in the flink-conf.yaml.
>
> Best,
> Yang
>
> sidhant gupta  于2021年2月3日周三 下午10:02写道:
>
>> Is it possible to use flink CLI instead of flink client for connecting
>> zookeeper using network load balancer to retrieve the leader Jobmanager
>> address?
>>
>> On Wed, Feb 3, 2021, 12:42 PM Yang Wang  wrote:
>>
>>> I think the Flink client could make a connection with ZooKeeper via the
>>> network load balancer.
>>> Flink client is not aware of whether it is a network balancer or
>>> multiple ZooKeeper server address.
>>> After then Flink client will retrieve the active leader JobManager
>>> address via ZooKeeperHAService
>>> and submit the job successfully via rest client.
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> sidhant gupta  于2021年2月2日周二 下午11:14写道:
>>>
>>>> Hi
>>>>
>>>> I have a flink ECS cluster setup with HA mode using zookeeper where I
>>>> have 2 jobmanagers out of which one of will be elected as leader using
>>>> zookeeper leader election. I have one application load balancer in front of
>>>> the jobmanagers and one network load balancer in front of zookeeper.
>>>>
>>>> As per [1]
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html>
>>>>  ,
>>>> we can provide zookeeper address in the flink cli arguments and it would
>>>> upload/ submit the jar to the leader jobmanager. But since I am using
>>>> network load balancer in front of zookeeper, I guess it is not able to make
>>>> connection with the zookeeper. Please provide suggestions or sample command
>>>> for uploading the flink job jar or run the job.
>>>>
>>>> Is  there any way by which we can distinguish between leader and
>>>> standby jobmanagers in terms of request or response ?
>>>>
>>>> Can we use flink cli in jenkins to upload the jar to the flink cluster
>>>> and run the jobs?
>>>>
>>>>
>>>> [1]
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html
>>>>
>>>> Thanks
>>>> Sidhant Gupta
>>>>
>>>


Re: Flink cli to upload flink jar and run flink jobs in the Jenkins pipeline

2021-02-03 Thread Yang Wang
Yes, if you are using the CLI(e.g. flink run/list/cancel -t yarn-session
...) for the job management,
it will eventually call the RestClusterClient, which could retrieve the
leader JobManager address from ZK.

Please ensure that you have specified the HA related config options in CLI
via -D or set them in the flink-conf.yaml.

Best,
Yang

sidhant gupta  于2021年2月3日周三 下午10:02写道:

> Is it possible to use flink CLI instead of flink client for connecting
> zookeeper using network load balancer to retrieve the leader Jobmanager
> address?
>
> On Wed, Feb 3, 2021, 12:42 PM Yang Wang  wrote:
>
>> I think the Flink client could make a connection with ZooKeeper via the
>> network load balancer.
>> Flink client is not aware of whether it is a network balancer or multiple
>> ZooKeeper server address.
>> After then Flink client will retrieve the active leader JobManager
>> address via ZooKeeperHAService
>> and submit the job successfully via rest client.
>>
>> Best,
>> Yang
>>
>>
>> sidhant gupta  于2021年2月2日周二 下午11:14写道:
>>
>>> Hi
>>>
>>> I have a flink ECS cluster setup with HA mode using zookeeper where I
>>> have 2 jobmanagers out of which one of will be elected as leader using
>>> zookeeper leader election. I have one application load balancer in front of
>>> the jobmanagers and one network load balancer in front of zookeeper.
>>>
>>> As per [1]
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html>
>>>  ,
>>> we can provide zookeeper address in the flink cli arguments and it would
>>> upload/ submit the jar to the leader jobmanager. But since I am using
>>> network load balancer in front of zookeeper, I guess it is not able to make
>>> connection with the zookeeper. Please provide suggestions or sample command
>>> for uploading the flink job jar or run the job.
>>>
>>> Is  there any way by which we can distinguish between leader and standby
>>> jobmanagers in terms of request or response ?
>>>
>>> Can we use flink cli in jenkins to upload the jar to the flink cluster
>>> and run the jobs?
>>>
>>>
>>> [1]
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html
>>>
>>> Thanks
>>> Sidhant Gupta
>>>
>>


Re: Job submission failure via flink cli

2021-02-03 Thread Chesnay Schepler

Please make sure the client and server version are in sync.

On 2/3/2021 4:12 PM, sidhant gupta wrote:
I am getting following error while running the below command with the 
attached conf/flink-conf.yaml:


bin/flink run -c firstflinkpackage.someJob ../somejob.jar arg1 arg2 arg3


2021-02-03 15:04:24,113 INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received 
JobGraph submission 9cbf97d3f9b368bf2c27a52b39601500 (Flink FHIR Mapper).
2021-02-03 15:04:24,115 INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - 
Submitting job 9cbf97d3f9b368bf2c27a52b39601500 (Flink FHIR Mapper).
2021-02-03 15:04:24,334 INFO 
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore [] - Added 
JobGraph(jobId: 9cbf97d3f9b368bf2c27a52b39601500) to ZooKeeper.
2021-02-03 15:04:24,335 INFO 
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC 
endpoint for org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_5 .
2021-02-03 15:04:24,336 INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job 
Flink FHIR Mapper (9cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back 
off time strategy 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
backoffTimeMS=5000) for Flink FHIR Mapper 
(9cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Running 
initialization on master for job Flink FHIR Mapper 
(9cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran 
initialization on master in 0 ms.
2021-02-03 15:04:24,461 INFO 
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore [] - 
Removed job graph 9cbf97d3f9b368bf2c27a52b39601500 from ZooKeeper.
2021-02-03 15:04:24,461 INFO 
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore [] - 
Removed job graph 9cbf97d3f9b368bf2c27a52b39601500 from ZooKeeper.
2021-02-03 15:04:24,697 ERROR 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Failed 
to submit job 9cbf97d3f9b368bf2c27a52b39601500.
org.apache.flink.runtime.client.JobExecutionException: Could not 
instantiate JobManager.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) ~[?:?]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.11.2.jar:1.11.2]

Caused by: java.lang.NullPointerException
at java.util.Collections$UnmodifiableCollection.(Unknown 
Source) ~[?:?]
at java.util.Collections$UnmodifiableList.(Unknown Source) 
~[?:?]

at java.util.Collections.unmodifiableList(Unknown Source) ~[?:?]
at 
org.apache.flink.runtime.jobgraph.JobVertex.getOperatorCoordinators(JobVertex.java:352) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:232) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apach

Job submission failure via flink cli

2021-02-03 Thread sidhant gupta
I am getting following error while running the below command with the
attached conf/flink-conf.yaml:

bin/flink run -c firstflinkpackage.someJob ../somejob.jar arg1 arg2 arg3


2021-02-03 15:04:24,113 INFO org.apache.flink.runtime.dispatcher.
StandaloneDispatcher [] - Received JobGraph submission
9cbf97d3f9b368bf2c27a52b39601500
(Flink FHIR Mapper).
2021-02-03 15:04:24,115 INFO org.apache.flink.runtime.dispatcher.
StandaloneDispatcher [] - Submitting job 9cbf97d3f9b368bf2c27a52b39601500 (
Flink FHIR Mapper).
2021-02-03 15:04:24,334 INFO org.apache.flink.runtime.jobmanager.
ZooKeeperJobGraphStore [] - Added JobGraph(jobId:
9cbf97d3f9b368bf2c27a52b39601500)
to ZooKeeper.
2021-02-03 15:04:24,335 INFO org.apache.flink.runtime.rpc.akka.
AkkaRpcService [] - Starting RPC endpoint for
org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_5 .
2021-02-03 15:04:24,336 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Initializing job Flink FHIR Mapper (9cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using restart back off time strategy
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3,
backoffTimeMS=5000) for Flink FHIR Mapper (9
cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Running initialization on master for job Flink FHIR Mapper (9
cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Successfully ran initialization on master in 0 ms.
2021-02-03 15:04:24,461 INFO org.apache.flink.runtime.jobmanager.
ZooKeeperJobGraphStore [] - Removed job graph 9cbf97d3f9b368bf2c27a52b39601500
from ZooKeeper.
2021-02-03 15:04:24,461 INFO org.apache.flink.runtime.jobmanager.
ZooKeeperJobGraphStore [] - Removed job graph 9cbf97d3f9b368bf2c27a52b39601500
from ZooKeeper.
2021-02-03 15:04:24,697 ERROR org.apache.flink.runtime.dispatcher.
StandaloneDispatcher [] - Failed to submit job 9
cbf97d3f9b368bf2c27a52b39601500.
org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
at org.apache.flink.runtime.dispatcher.Dispatcher
.lambda$createJobManagerRunner$6(Dispatcher.java:398) ~[flink-dist_2.11-1.11
.2.jar:1.11.2]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339) [flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
.java:107) [flink-dist_2.11-1.11.2.jar:1.11.2]
Caused by: java.lang.NullPointerException
at java.util.Collections$UnmodifiableCollection.(Unknown Source)
~[?:?]
at java.util.Collections$UnmodifiableList.(Unknown Source) ~[?:?]
at java.util.Collections.unmodifiableList(Unknown Source) ~[?:?]
at org.apache.flink.runtime.jobgraph.JobVertex.getOperatorCoordinators(
JobVertex.java:352) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(
ExecutionJobVertex.java:232) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.executiongraph.ExecutionGraph
.attachJobGraph(ExecutionGraph.java:814) ~[flink-dist_2.11-1.11.2.jar:1.11.2
]
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder
.buildGraph(ExecutionGraphBuilder.java:228) ~[flink-dist_2.11-1.11.2.jar:
1.11.2]
at org.apache.flink.runtime.scheduler.SchedulerBase
.createExecutionGraph(SchedulerBase.java:269) ~[flink-dist_2.11-1.11.2.jar:
1.11.2]
at org.apache.flink.runtime.scheduler.SchedulerBase
.createAndRestoreExecutionGraph(SchedulerBase.java:242) ~[flink-dist_2.11-
1.11.2.jar:1.11.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase
.java:229) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.(
DefaultScheduler.java:119) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory
.createInstance(DefaultSchedulerFactory.java:103) ~[flink-dist_2.11-1.11.2
.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(
JobMaster.java:284) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:
272) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.factories.
DefaultJobMasterServiceFactory.createJobMasterService(
DefaultJobMasterServiceFactory.java:98) ~[flink-dis

Re: Flink cli to upload flink jar and run flink jobs in the Jenkins pipeline

2021-02-03 Thread sidhant gupta
Is it possible to use flink CLI instead of flink client for connecting
zookeeper using network load balancer to retrieve the leader Jobmanager
address?

On Wed, Feb 3, 2021, 12:42 PM Yang Wang  wrote:

> I think the Flink client could make a connection with ZooKeeper via the
> network load balancer.
> Flink client is not aware of whether it is a network balancer or multiple
> ZooKeeper server address.
> After then Flink client will retrieve the active leader JobManager address
> via ZooKeeperHAService
> and submit the job successfully via rest client.
>
> Best,
> Yang
>
>
> sidhant gupta  于2021年2月2日周二 下午11:14写道:
>
>> Hi
>>
>> I have a flink ECS cluster setup with HA mode using zookeeper where I
>> have 2 jobmanagers out of which one of will be elected as leader using
>> zookeeper leader election. I have one application load balancer in front of
>> the jobmanagers and one network load balancer in front of zookeeper.
>>
>> As per [1]
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html>
>>  ,
>> we can provide zookeeper address in the flink cli arguments and it would
>> upload/ submit the jar to the leader jobmanager. But since I am using
>> network load balancer in front of zookeeper, I guess it is not able to make
>> connection with the zookeeper. Please provide suggestions or sample command
>> for uploading the flink job jar or run the job.
>>
>> Is  there any way by which we can distinguish between leader and standby
>> jobmanagers in terms of request or response ?
>>
>> Can we use flink cli in jenkins to upload the jar to the flink cluster
>> and run the jobs?
>>
>>
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html
>>
>> Thanks
>> Sidhant Gupta
>>
>


Re: Flink cli to upload flink jar and run flink jobs in the Jenkins pipeline

2021-02-02 Thread Yang Wang
I think the Flink client could make a connection with ZooKeeper via the
network load balancer.
Flink client is not aware of whether it is a network balancer or multiple
ZooKeeper server address.
After then Flink client will retrieve the active leader JobManager address
via ZooKeeperHAService
and submit the job successfully via rest client.

Best,
Yang


sidhant gupta  于2021年2月2日周二 下午11:14写道:

> Hi
>
> I have a flink ECS cluster setup with HA mode using zookeeper where I have
> 2 jobmanagers out of which one of will be elected as leader using zookeeper
> leader election. I have one application load balancer in front of the
> jobmanagers and one network load balancer in front of zookeeper.
>
> As per [1]
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html>
>  ,
> we can provide zookeeper address in the flink cli arguments and it would
> upload/ submit the jar to the leader jobmanager. But since I am using
> network load balancer in front of zookeeper, I guess it is not able to make
> connection with the zookeeper. Please provide suggestions or sample command
> for uploading the flink job jar or run the job.
>
> Is  there any way by which we can distinguish between leader and standby
> jobmanagers in terms of request or response ?
>
> Can we use flink cli in jenkins to upload the jar to the flink cluster and
> run the jobs?
>
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html
>
> Thanks
> Sidhant Gupta
>


Flink cli to upload flink jar and run flink jobs in the Jenkins pipeline

2021-02-02 Thread sidhant gupta
Hi

I have a flink ECS cluster setup with HA mode using zookeeper where I have
2 jobmanagers out of which one of will be elected as leader using zookeeper
leader election. I have one application load balancer in front of the
jobmanagers and one network load balancer in front of zookeeper.

As per [1]
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html>
,
we can provide zookeeper address in the flink cli arguments and it would
upload/ submit the jar to the leader jobmanager. But since I am using
network load balancer in front of zookeeper, I guess it is not able to make
connection with the zookeeper. Please provide suggestions or sample command
for uploading the flink job jar or run the job.

Is  there any way by which we can distinguish between leader and standby
jobmanagers in terms of request or response ?

Can we use flink cli in jenkins to upload the jar to the flink cluster and
run the jobs?


[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html

Thanks
Sidhant Gupta


RE: Flink cli Stop command exception

2020-12-10 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi Yang,

Thanks for the response. I will collect the jobmanager logs and share. Is stop 
command applicable only for streaming jobs? As I can see in the documentation 
its mentioned for streaming jobs only.
If so how I can handle the batch jobs.


  *   Cancel a job with a savepoint (deprecated; use “stop” instead):
·   ./bin/flink cancel -s [targetDirectory] 

  *   Gracefully stop a job with a savepoint (streaming jobs only):
./bin/flink stop [-p targetDirectory] [-d] 
Thanks,
Suchithra

From: Yang Wang 
Sent: Thursday, December 10, 2020 11:16 AM
To: Yun Tang 
Cc: V N, Suchithra (Nokia - IN/Bangalore) ; 
user@flink.apache.org
Subject: Re: Flink cli Stop command exception

Maybe FLINK-16626[1] is related. And it is fixed in 1.10.1 and 1.11.

[1]. https://issues.apache.org/jira/browse/FLINK-16626

Best,
Yang

Yun Tang mailto:myas...@live.com>> 于2020年12月10日周四 上午11:06写道:
Hi Suchithra,
Have you ever checked job manager log to see whether the savepoint is triggered 
and why the savepoint failed to complete.

Best
Yun Tang

From: V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>>
Sent: Wednesday, December 9, 2020 23:45
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Flink cli Stop command exception


Hello,



I am running streaming flink job and I was using cancel command with savepoint 
to cancel the job. From flink 1.10 version stop command should be used instead 
of cancel command.

But I am getting below error sometimes. Please let me know what might be the 
issue.





{"host":"cancel1-flinkcli-jobsubmission-55tgq","level":"info","log":{"message":"The
 flink command to be executed is /opt/flink/bin/flink stop -p 
/opt/flink/share/cflkt-flink/external_pvc -d ec416bf906915e570ef53b242d3d0bb0 
"},"time":"2020-09-02T12:32:19.979Z","type":"log"}

{"host":"cancel1-flinkcli-jobsubmission-55tgq","level":"info","log":{"message":"=
 Submitting the Flink job "},"time":"2020-09-02T12:32:19.983Z","type":"log"}

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by 
org.apache.hadoop.security.authentication.util.KerberosUtil 
(file:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.6.5-7.0.jar) to method 
sun.security.krb5.Config.getInstance()

WARNING: Please consider reporting this to the maintainers of 
org.apache.hadoop.security.authentication.util.KerberosUtil

WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations

WARNING: All illegal access operations will be denied in a future release





The program finished with the following exception:



org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"ec416bf906915e570ef53b242d3d0bb0".

at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)

at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)

at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:450)

at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:905)

at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)

at java.base/java.security.AccessController.doPrivileged(Native Method)

at java.base/javax.security.auth.Subject.doAs(Subject.java:423)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)

Caused by: java.util.concurrent.TimeoutException

at 
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)

at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)

at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:456)

... 9 more



Thanks,

Suchithra


Re: Flink cli Stop command exception

2020-12-09 Thread Yang Wang
Maybe FLINK-16626[1] is related. And it is fixed in 1.10.1 and 1.11.

[1]. https://issues.apache.org/jira/browse/FLINK-16626

Best,
Yang

Yun Tang  于2020年12月10日周四 上午11:06写道:

> Hi Suchithra,
>
> Have you ever checked job manager log to see whether the savepoint is
> triggered and why the savepoint failed to complete.
>
> Best
> Yun Tang
> --
> *From:* V N, Suchithra (Nokia - IN/Bangalore) 
> *Sent:* Wednesday, December 9, 2020 23:45
> *To:* user@flink.apache.org 
> *Subject:* Flink cli Stop command exception
>
>
> Hello,
>
>
>
> I am running streaming flink job and I was using cancel command with
> savepoint to cancel the job. From flink 1.10 version stop command should be
> used instead of cancel command.
>
> But I am getting below error sometimes. Please let me know what might be
> the issue.
>
>
>
>
>
> {"host":"cancel1-flinkcli-jobsubmission-55tgq","level":"info","log":{"message":"The
> flink command to be executed is /opt/flink/bin/flink stop -p
> /opt/flink/share/cflkt-flink/external_pvc -d
> ec416bf906915e570ef53b242d3d0bb0
> "},"time":"2020-09-02T12:32:19.979Z","type":"log"}
>
> {"host":"cancel1-flinkcli-jobsubmission-55tgq","level":"info","log":{"message":"=
> Submitting the Flink job "},"time":"2020-09-02T12:32:19.983Z","type":"log"}
>
> WARNING: An illegal reflective access operation has occurred
>
> WARNING: Illegal reflective access by
> org.apache.hadoop.security.authentication.util.KerberosUtil
> (file:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.6.5-7.0.jar) to method
> sun.security.krb5.Config.getInstance()
>
> WARNING: Please consider reporting this to the maintainers of
> org.apache.hadoop.security.authentication.util.KerberosUtil
>
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
>
> WARNING: All illegal access operations will be denied in a future release
>
>
>
> 
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "ec416bf906915e570ef53b242d3d0bb0".
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)
>
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
>
> at
> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:450)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:905)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>
> at java.base/java.security.AccessController.doPrivileged(Native
> Method)
>
> at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>
> Caused by: java.util.concurrent.TimeoutException
>
> at
> java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
>
> at
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:456)
>
> ... 9 more
>
>
>
> Thanks,
>
> Suchithra
>


Re: Flink cli Stop command exception

2020-12-09 Thread Yun Tang
Hi Suchithra,

Have you ever checked job manager log to see whether the savepoint is triggered 
and why the savepoint failed to complete.

Best
Yun Tang

From: V N, Suchithra (Nokia - IN/Bangalore) 
Sent: Wednesday, December 9, 2020 23:45
To: user@flink.apache.org 
Subject: Flink cli Stop command exception


Hello,



I am running streaming flink job and I was using cancel command with savepoint 
to cancel the job. From flink 1.10 version stop command should be used instead 
of cancel command.

But I am getting below error sometimes. Please let me know what might be the 
issue.





{"host":"cancel1-flinkcli-jobsubmission-55tgq","level":"info","log":{"message":"The
 flink command to be executed is /opt/flink/bin/flink stop -p 
/opt/flink/share/cflkt-flink/external_pvc -d ec416bf906915e570ef53b242d3d0bb0 
"},"time":"2020-09-02T12:32:19.979Z","type":"log"}

{"host":"cancel1-flinkcli-jobsubmission-55tgq","level":"info","log":{"message":"=
 Submitting the Flink job "},"time":"2020-09-02T12:32:19.983Z","type":"log"}

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by 
org.apache.hadoop.security.authentication.util.KerberosUtil 
(file:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.6.5-7.0.jar) to method 
sun.security.krb5.Config.getInstance()

WARNING: Please consider reporting this to the maintainers of 
org.apache.hadoop.security.authentication.util.KerberosUtil

WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations

WARNING: All illegal access operations will be denied in a future release





The program finished with the following exception:



org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"ec416bf906915e570ef53b242d3d0bb0".

at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)

at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)

at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:450)

at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:905)

at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)

at java.base/java.security.AccessController.doPrivileged(Native Method)

at java.base/javax.security.auth.Subject.doAs(Subject.java:423)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)

Caused by: java.util.concurrent.TimeoutException

at 
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)

at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)

at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:456)

... 9 more



Thanks,

Suchithra


Flink cli Stop command exception

2020-12-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hello,

I am running streaming flink job and I was using cancel command with savepoint 
to cancel the job. From flink 1.10 version stop command should be used instead 
of cancel command.
But I am getting below error sometimes. Please let me know what might be the 
issue.


{"host":"cancel1-flinkcli-jobsubmission-55tgq","level":"info","log":{"message":"The
 flink command to be executed is /opt/flink/bin/flink stop -p 
/opt/flink/share/cflkt-flink/external_pvc -d ec416bf906915e570ef53b242d3d0bb0 
"},"time":"2020-09-02T12:32:19.979Z","type":"log"}
{"host":"cancel1-flinkcli-jobsubmission-55tgq","level":"info","log":{"message":"=
 Submitting the Flink job "},"time":"2020-09-02T12:32:19.983Z","type":"log"}
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by 
org.apache.hadoop.security.authentication.util.KerberosUtil 
(file:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.6.5-7.0.jar) to method 
sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of 
org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release


The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"ec416bf906915e570ef53b242d3d0bb0".
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:450)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:905)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.util.concurrent.TimeoutException
at 
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:456)
... 9 more

Thanks,
Suchithra


Re: Missing help about run-application action in Flink CLI client

2020-11-04 Thread Flavio Pompermaier
Here it is: https://issues.apache.org/jira/browse/FLINK-19969

Best,
Flavio

On Wed, Nov 4, 2020 at 11:08 AM Kostas Kloudas  wrote:

> Could you also post the ticket here @Flavio Pompermaier  and we will
> have a look before the upcoming release.
>
> Thanks,
> Kostas
>
> On Wed, Nov 4, 2020 at 10:58 AM Chesnay Schepler 
> wrote:
> >
> > Good find, this is an oversight in the CliFrontendParser; no help is
> > printed for the run-application action.
> > Can you file a ticket?
> >
> > On 11/4/2020 10:53 AM, Flavio Pompermaier wrote:
> > > Hello everybody,
> > > I was looking into currently supported application-modes when
> > > submitting a Flink job so I tried to use the CLI help (I'm using Flink
> > > 11.0) but I can't find any help about he action "run-application" at
> > > the moment...am I wrong? Is there any JIRA to address this missing
> > > documentation?
> > >
> > > Best,
> > > Flavio
> >
> >


Re: Missing help about run-application action in Flink CLI client

2020-11-04 Thread Kostas Kloudas
Could you also post the ticket here @Flavio Pompermaier  and we will
have a look before the upcoming release.

Thanks,
Kostas

On Wed, Nov 4, 2020 at 10:58 AM Chesnay Schepler  wrote:
>
> Good find, this is an oversight in the CliFrontendParser; no help is
> printed for the run-application action.
> Can you file a ticket?
>
> On 11/4/2020 10:53 AM, Flavio Pompermaier wrote:
> > Hello everybody,
> > I was looking into currently supported application-modes when
> > submitting a Flink job so I tried to use the CLI help (I'm using Flink
> > 11.0) but I can't find any help about he action "run-application" at
> > the moment...am I wrong? Is there any JIRA to address this missing
> > documentation?
> >
> > Best,
> > Flavio
>
>


Re: Missing help about run-application action in Flink CLI client

2020-11-04 Thread Chesnay Schepler
Good find, this is an oversight in the CliFrontendParser; no help is 
printed for the run-application action.

Can you file a ticket?

On 11/4/2020 10:53 AM, Flavio Pompermaier wrote:

Hello everybody,
I was looking into currently supported application-modes when 
submitting a Flink job so I tried to use the CLI help (I'm using Flink 
11.0) but I can't find any help about he action "run-application" at 
the moment...am I wrong? Is there any JIRA to address this missing 
documentation?


Best,
Flavio





Missing help about run-application action in Flink CLI client

2020-11-04 Thread Flavio Pompermaier
Hello everybody,
I was looking into currently supported application-modes when submitting a
Flink job so I tried to use the CLI help (I'm using Flink 11.0) but I can't
find any help about he action "run-application" at the moment...am I wrong?
Is there any JIRA to address this missing documentation?

Best,
Flavio


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-08 Thread Yang Wang
I tend to not change the current behavior. For other framework(e.g. hadoop,
yarn),
the arguments after user jar are also parsed by user `main()`, not the
framework client.


Best,
Yang

tison  于2020年3月6日周五 下午10:55写道:

> Hi Jingsong,
>
> I think your propose is "--classpath can occur behind the jar file".
>
> Generally speaking I agree on that it is a painful required format that
> users tend to just ignore that order how an option occurs. So it is +1 from
> my side to loose the constraint.
>
> However, for the migration and implementation part, things go into a bit
> tricky.
>
> For user interface, let's say we only enable --classpath to occur behind
> the jar file, at least the semantic changes if there is a user pass
> --classpath intended to be a main argument.
>
> Besides, said we fix on the library commons-cli to implement the CLI, it
> would be a bit tricky we implement such special taken logic.
>
> Accidentally I encounter similar CLI problem recently so here are some of
> my thoughts about the problem,
>
> 1. I agree that for options, users tend to treat  the same as
> [OPTIONS] and mix up the order. It would be an improvement we loss the
> constraint.
> 2. Then, we still have to introduce something that users specify their
> args for the main method.
> 3. In order to achieve 2, there is a mature solution in shell scripting
> that use double-dash(--) to to signify the end of command options.
> 4. Now, if we keep  as position argument, to support mix-ordered
> position argument & named argument, we might switch to other library such
> as argparse4j since commons-cli doesn't support position argument. An
> alternative is we change  as named argument but then we double
> break user interface.
>
> Though, it will break user interface so we firstly **MUST** start a
> discussion and see whether the community think of it and if so, how to
> integrate it. For me, read the doc is an easy solution to save us from
> breaking user interface. I don't stick to loose the constraint.
>
> Best,
> tison.
>
>
> Jingsong Li  于2020年3月6日周五 下午10:27写道:
>
>> Hi tison and Aljoscha,
>>
>> Do you think "--classpath can not be in front of jar file" is an
>> improvement? Or need documentation? Because I used to be confused.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Mar 6, 2020 at 10:22 PM tison  wrote:
>>
>>> I think the problem is that --classpath should be before the user jar,
>>> i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Aljoscha Krettek  于2020年3月6日周五 下午10:03写道:
>>>
>>>> Hi,
>>>>
>>>> first a preliminary question: does the jar file contain
>>>> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
>>>> here?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 06.03.20 13:25, ouywl wrote:
>>>> > Hi all
>>>> >   When I start a flinkcluster in session mode, It include jm/tm.
>>>> And then I
>>>> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
>>>> a.jar’. Even
>>>> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw
>>>> exception “
>>>> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
>>>> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
>>>> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
>>>> > file:///opt/flink/job/fastjson-1.2.66.jar
>>>> > Starting execution of program
>>>> > Executing TopSpeedWindowing example with default input data set.
>>>> > Use --input to specify file input.
>>>> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
>>>> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
>>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> > at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> > at
>>>> >
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> > at java.lang.reflect.Method.invoke(Method.java:498)
>>>> > at
>>>> >
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>> > at
>>>> >
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>>> > at
>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>>> > at
>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
>>>> >As I read the code , flink cli have not load the —classspath jar,
>>>> So It seems
>>>> > a bug about the flink cli. Are you agree with me?
>>>> > Best,
>>>> > Ouywl
>>>> >
>>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread tison
Hi Jingsong,

I think your propose is "--classpath can occur behind the jar file".

Generally speaking I agree on that it is a painful required format that
users tend to just ignore that order how an option occurs. So it is +1 from
my side to loose the constraint.

However, for the migration and implementation part, things go into a bit
tricky.

For user interface, let's say we only enable --classpath to occur behind
the jar file, at least the semantic changes if there is a user pass
--classpath intended to be a main argument.

Besides, said we fix on the library commons-cli to implement the CLI, it
would be a bit tricky we implement such special taken logic.

Accidentally I encounter similar CLI problem recently so here are some of
my thoughts about the problem,

1. I agree that for options, users tend to treat  the same as
[OPTIONS] and mix up the order. It would be an improvement we loss the
constraint.
2. Then, we still have to introduce something that users specify their args
for the main method.
3. In order to achieve 2, there is a mature solution in shell scripting
that use double-dash(--) to to signify the end of command options.
4. Now, if we keep  as position argument, to support mix-ordered
position argument & named argument, we might switch to other library such
as argparse4j since commons-cli doesn't support position argument. An
alternative is we change  as named argument but then we double
break user interface.

Though, it will break user interface so we firstly **MUST** start a
discussion and see whether the community think of it and if so, how to
integrate it. For me, read the doc is an easy solution to save us from
breaking user interface. I don't stick to loose the constraint.

Best,
tison.


Jingsong Li  于2020年3月6日周五 下午10:27写道:

> Hi tison and Aljoscha,
>
> Do you think "--classpath can not be in front of jar file" is an
> improvement? Or need documentation? Because I used to be confused.
>
> Best,
> Jingsong Lee
>
> On Fri, Mar 6, 2020 at 10:22 PM tison  wrote:
>
>> I think the problem is that --classpath should be before the user jar,
>> i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar
>>
>> Best,
>> tison.
>>
>>
>> Aljoscha Krettek  于2020年3月6日周五 下午10:03写道:
>>
>>> Hi,
>>>
>>> first a preliminary question: does the jar file contain
>>> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
>>> here?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 06.03.20 13:25, ouywl wrote:
>>> > Hi all
>>> >   When I start a flinkcluster in session mode, It include jm/tm.
>>> And then I
>>> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
>>> a.jar’. Even
>>> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw
>>> exception “
>>> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
>>> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
>>> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
>>> > file:///opt/flink/job/fastjson-1.2.66.jar
>>> > Starting execution of program
>>> > Executing TopSpeedWindowing example with default input data set.
>>> > Use --input to specify file input.
>>> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
>>> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> > at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> > at
>>> >
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> > at java.lang.reflect.Method.invoke(Method.java:498)
>>> > at
>>> >
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>> > at
>>> >
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>> > at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>> > at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
>>> >As I read the code , flink cli have not load the —classspath jar,
>>> So It seems
>>> > a bug about the flink cli. Are you agree with me?
>>> > Best,
>>> > Ouywl
>>> >
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread Yang Wang
I think tison's answer is on point. All the Flink cli options should be
specified before the user jar. We have a very clear help message.

Syntax: run [OPTIONS]  


Best,
Yang

Jingsong Li  于2020年3月6日周五 下午10:27写道:

> Hi tison and Aljoscha,
>
> Do you think "--classpath can not be in front of jar file" is an
> improvement? Or need documentation? Because I used to be confused.
>
> Best,
> Jingsong Lee
>
> On Fri, Mar 6, 2020 at 10:22 PM tison  wrote:
>
>> I think the problem is that --classpath should be before the user jar,
>> i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar
>>
>> Best,
>> tison.
>>
>>
>> Aljoscha Krettek  于2020年3月6日周五 下午10:03写道:
>>
>>> Hi,
>>>
>>> first a preliminary question: does the jar file contain
>>> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
>>> here?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 06.03.20 13:25, ouywl wrote:
>>> > Hi all
>>> >   When I start a flinkcluster in session mode, It include jm/tm.
>>> And then I
>>> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
>>> a.jar’. Even
>>> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw
>>> exception “
>>> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
>>> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
>>> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
>>> > file:///opt/flink/job/fastjson-1.2.66.jar
>>> > Starting execution of program
>>> > Executing TopSpeedWindowing example with default input data set.
>>> > Use --input to specify file input.
>>> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
>>> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> > at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> > at
>>> >
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> > at java.lang.reflect.Method.invoke(Method.java:498)
>>> > at
>>> >
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>> > at
>>> >
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>> > at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>> > at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
>>> >As I read the code , flink cli have not load the —classspath jar,
>>> So It seems
>>> > a bug about the flink cli. Are you agree with me?
>>> > Best,
>>> > Ouywl
>>> >
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread Jingsong Li
Hi tison and Aljoscha,

Do you think "--classpath can not be in front of jar file" is an
improvement? Or need documentation? Because I used to be confused.

Best,
Jingsong Lee

On Fri, Mar 6, 2020 at 10:22 PM tison  wrote:

> I think the problem is that --classpath should be before the user jar,
> i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar
>
> Best,
> tison.
>
>
> Aljoscha Krettek  于2020年3月6日周五 下午10:03写道:
>
>> Hi,
>>
>> first a preliminary question: does the jar file contain
>> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
>> here?
>>
>> Best,
>> Aljoscha
>>
>> On 06.03.20 13:25, ouywl wrote:
>> > Hi all
>> >   When I start a flinkcluster in session mode, It include jm/tm.
>> And then I
>> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
>> a.jar’. Even
>> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw
>> exception “
>> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
>> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
>> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
>> > file:///opt/flink/job/fastjson-1.2.66.jar
>> > Starting execution of program
>> > Executing TopSpeedWindowing example with default input data set.
>> > Use --input to specify file input.
>> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
>> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > at
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:498)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> > at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>> > at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
>> >As I read the code , flink cli have not load the —classspath jar, So
>> It seems
>> > a bug about the flink cli. Are you agree with me?
>> > Best,
>> > Ouywl
>> >
>>
>

-- 
Best, Jingsong Lee


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread tison
It is because as implementation when we parse command line argument it
"stopAtNonOptions" at the arbitrary content user jar. All arguments later
will be regarded as args passed to user main.

For user serving, when you run `./bin/flink run -h`, it prints

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS]  

that explicit explains the format.

Best,
tison.


tison  于2020年3月6日周五 下午10:22写道:

> I think the problem is that --classpath should be before the user jar,
> i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar
>
> Best,
> tison.
>
>
> Aljoscha Krettek  于2020年3月6日周五 下午10:03写道:
>
>> Hi,
>>
>> first a preliminary question: does the jar file contain
>> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
>> here?
>>
>> Best,
>> Aljoscha
>>
>> On 06.03.20 13:25, ouywl wrote:
>> > Hi all
>> >   When I start a flinkcluster in session mode, It include jm/tm.
>> And then I
>> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
>> a.jar’. Even
>> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw
>> exception “
>> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
>> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
>> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
>> > file:///opt/flink/job/fastjson-1.2.66.jar
>> > Starting execution of program
>> > Executing TopSpeedWindowing example with default input data set.
>> > Use --input to specify file input.
>> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
>> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > at
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:498)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> > at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>> > at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
>> >As I read the code , flink cli have not load the —classspath jar, So
>> It seems
>> > a bug about the flink cli. Are you agree with me?
>> > Best,
>> > Ouywl
>> >
>>
>


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread tison
I think the problem is that --classpath should be before the user jar,
i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar

Best,
tison.


Aljoscha Krettek  于2020年3月6日周五 下午10:03写道:

> Hi,
>
> first a preliminary question: does the jar file contain
> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
> here?
>
> Best,
> Aljoscha
>
> On 06.03.20 13:25, ouywl wrote:
> > Hi all
> >   When I start a flinkcluster in session mode, It include jm/tm. And
> then I
> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
> a.jar’. Even
> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw exception
> “
> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
> > file:///opt/flink/job/fastjson-1.2.66.jar
> > Starting execution of program
> > Executing TopSpeedWindowing example with default input data set.
> > Use --input to specify file input.
> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> > at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
> >As I read the code , flink cli have not load the —classspath jar, So
> It seems
> > a bug about the flink cli. Are you agree with me?
> > Best,
> > Ouywl
> >
>


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread Jingsong Li
Hi ouywl,

As I know, "--classpath" should be in front of jar file, it means:
/opt/flink/bin/flink run --jobmanager ip:8081 --class
com.netease.java.TopSpeedWindowing --parallelism 1 --detached --classpath
file:///opt/flink/job/fastjson-1.2.66.jar
/opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar

You can have a try.

Best,
Jingsong Lee

On Fri, Mar 6, 2020 at 10:03 PM Aljoscha Krettek 
wrote:

> Hi,
>
> first a preliminary question: does the jar file contain
> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
> here?
>
> Best,
> Aljoscha
>
> On 06.03.20 13:25, ouywl wrote:
> > Hi all
> >   When I start a flinkcluster in session mode, It include jm/tm. And
> then I
> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
> a.jar’. Even
> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw exception
> “
> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
> > file:///opt/flink/job/fastjson-1.2.66.jar
> > Starting execution of program
> > Executing TopSpeedWindowing example with default input data set.
> > Use --input to specify file input.
> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> > at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
> >As I read the code , flink cli have not load the —classspath jar, So
> It seems
> > a bug about the flink cli. Are you agree with me?
> > Best,
> > Ouywl
> >
>


-- 
Best, Jingsong Lee


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread Aljoscha Krettek

Hi,

first a preliminary question: does the jar file contain 
com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar 
here?


Best,
Aljoscha

On 06.03.20 13:25, ouywl wrote:

Hi all
  When I start a flinkcluster in session mode, It include jm/tm. And then I
submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path  a.jar’. Even
the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw exception “
/opt/flink/bin/flink run --jobmanager ip:8081 --class
com.netease.java.TopSpeedWindowing --parallelism 1 --detached
/opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
file:///opt/flink/job/fastjson-1.2.66.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
   As I read the code , flink cli have not load the —classspath jar, So It seems
a bug about the flink cli. Are you agree with me?
Best,
Ouywl



(DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread ouywl






Hi all    When I start a flinkcluster in session mode, It include jm/tm. And then I submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path  a.jar’. Even the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw exception “/opt/flink/bin/flink run --jobmanager ip:8081 --class com.netease.java.TopSpeedWindowing --parallelism 1 --detached /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath file:///opt/flink/job/fastjson-1.2.66.jarStarting execution of programExecuting TopSpeedWindowing example with default input data set.Use --input to specify file input.java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSONat com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)” As I read the code , flink cli have not load the —classspath jar, So It seems a bug about the flink cli. Are you agree with me?


  



Best,Ouywl



 






Re: new user does not run job use flink cli

2019-07-11 Thread Biao Liu
Hi,
Do you mean job submission is OK with local user name "flink", but not for
other users?
Have you ever checked the authorization of "hdfs://user/flink/recovery"? I
guess other users do not have the access right.



&#38;#38;#38;#38;#10084; <799326...@qq.com> 于2019年7月11日周四 上午11:55写道:

> flink-conf.yaml
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 6144m
> taskmanager.numberOfTaskSlots: 3
> parallelism.default: 1
> high-availability: zookeeper
> high-availability.zookeeper.quorum: 10.1.1.15:2181,10.1.1.16:2181,
> 10.1.1.17:2181
> high-availability.zookeeper.path.root: /flink
> high-availability.cluster-id: /flink_one
> high-availability.storageDir: hdfs://user/flink/recovery
> state.checkpoints.dir: hdfs://10.1.1.5:8020/user/flink/flink-checkpoints
> state.savepoints.dir: hdfs://10.1.1.5:8020/user/flink/flink-checkpoints
>
> masters
> 10.1.1.12:8081
> 10.1.1.13:8081
> 10.1.1.14:8081
>
> slaves
> 10.1.1.12
> 10.1.1.13
> 10.1.1.14
>
> flink cluster start with user flink. flink user run any job are OK,but
> other user , such as add new user test ,run
> FLINK_HOME/examples/batch/WordCount.jar error.
>
>


new user does not run job use flink cli

2019-07-10 Thread &#38;#38;#38;#38;#38;#10084;
flink-conf.yaml
jobmanager.heap.size: 1024m
taskmanager.heap.size: 6144m
taskmanager.numberOfTaskSlots: 3
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.quorum: 10.1.1.15:2181,10.1.1.16:2181,10.1.1.17:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /flink_one
high-availability.storageDir: hdfs://user/flink/recovery
state.checkpoints.dir: hdfs://10.1.1.5:8020/user/flink/flink-checkpoints
state.savepoints.dir: hdfs://10.1.1.5:8020/user/flink/flink-checkpoints


masters
10.1.1.12:8081
10.1.1.13:8081
10.1.1.14:8081


slaves
10.1.1.12
10.1.1.13
10.1.1.14


flink cluster start with user flink. flink user run any job are OK,but other 
user , such as add new user test ,run FLINK_HOME/examples/batch/WordCount.jar 
error.

Flink CLI distributed cache fault

2019-05-28 Thread Vasyl Bervetskyi
Hi there,

I faced with issue in adding file to distributed cache in Flink.
My setup:

-  Java 1.8

-  Flink 1.8

-  OS: Windows, Linux
Test scenario:

1.   Create simple stream environment

2.   Add to distributed cache local file

3.   Add simple source function and sink

4.   Execute job from Flink CLI (Windows/Linux)

In order to restore job from savepoint or from checkpoint we need to run our 
job from Flink CLI. And pipelines that have distributed cache fails their 
execution.
Moreover it is different in Linux and Windows systems: in Windows we get 
"java.nio.file.InvalidPathException: Illegal char <:> at index 4" and on Linux 
we have our Flink freezing (it just stuck and do not do anything, no any error 
message or stacktrace).

My piece of code for windows environment:


public class CachePipeline {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
see.registerCachedFile("file:///D:/test.csv", "MyFile");

see.addSource(new SourceFunction() {

@Override
public void run(SourceContext ctx) throws Exception {
while(true){
synchronized(ctx.getCheckpointLock()){
ctx.collect(5);
}
Thread.sleep(1000);
}
}

@Override
public void cancel() {}

}).print();

see.execute();
}
}

command for running job that I used for:

flink run -c test.CachePipeline D:\path\to\jar\cache-test.jar


In case with Linux OS I changed file location to:

see.registerCachedFile("file:///home/test.csv", "MyFile");

Windows stacktrace:

flink run -c com.CachePipeline D:\repository\cache-test.jar

log4j:WARN No appenders could be found for logger 
(org.apache.flink.client.cli.CliFrontend).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Starting execution of program


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 38631d859b64cd86201bbe09a32c62f3)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at com.granduke.teleprocessing.CachePipeline.main(CachePipeline.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Unknown Source)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
at java.util.concurrent.CompletableFuture.uniExceptionally(Unknown 
Source)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown Source)
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source)
at java.util.concurrent

Re: Flink CLI

2019-04-26 Thread Gary Yao
Hi Steve,

(1)

The CLI action you are looking for is called "modify" [1]. However, we
want
to temporarily disable this feature beginning from Flink 1.9 due to some
caveats with it [2]. If you have objections, it would be appreciated if
you
could comment on the respective thread on the user/dev mailing list.

(2)

There is currently no option to have the CLI output JSON. However, as
others
have pointed out, you can use the REST API to invoke actions on the
cluster,
such as drawing savepoints [3]. This is also what the CLI ultimately
does
[4].

Best,
Gary

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Temporarily-remove-support-for-job-rescaling-via-CLI-action-quot-modify-quot-td27447.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-savepoints
[4]
https://github.com/apache/flink/blob/767fe152cb69a204261a0770412c8b28d037614d/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L415-L424

On Wed, Apr 24, 2019 at 5:06 PM Steven Nelson 
wrote:

> Hello!
>
> I am working on automating our deployments to our Flink cluster. I had a
> couple questions about the flink cli.
>
> 1) I thought there was an "update" command that would internally manage
> the cancel with savepoint, upload new jar, restart from savepoint process.
>
> 2) Is there a way to get the Flink cli to output it's result in a json
> format? Right now I would need to parse the results of the "flink list"
> command to get the job id, cancel the job with savepoint, parse the results
> of that to get the savepoint filename, then restore using that. Parsing the
> output seems brittle to me.
>
> Thought?
> -Steve
>
>


Re: Flink CLI

2019-04-25 Thread Oytun Tez
I had come across flink-deployer actually, but somehow didn't want to
"learn" it... (versus just a bunch of lines in a script)

At some time with more bandwidth, we should migrate to this one and
standardize flink-deployer (and later take this to mainstream Flink :P).

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Thu, Apr 25, 2019 at 3:14 AM Marc Rooding  wrote:

> Hi Steven, Oytun
>
> You may find the tool we open-sourced last year useful. It offers
> deploying and updating jobs with savepointing.
>
> You can find it on Github: https://github.com/ing-bank/flink-deployer
>
> There’s also a docker image available in Docker Hub.
>
> Marc
> On 24 Apr 2019, 17:29 +0200, Oytun Tez , wrote:
>
> Hi Steven,
>
> As much as I am aware,
> 1) no update call. our build flow feels a little weird to us as well.
> definitely requires scripting.
> 2) we are using Flink management API remotely in our build flow to 1) get
> jobs, 2) savepoint them, 3) cancel them etc. I am going to release a Python
> script for this soon.
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Wed, Apr 24, 2019 at 11:06 AM Steven Nelson 
> wrote:
>
>> Hello!
>>
>> I am working on automating our deployments to our Flink cluster. I had a
>> couple questions about the flink cli.
>>
>> 1) I thought there was an "update" command that would internally manage
>> the cancel with savepoint, upload new jar, restart from savepoint process.
>>
>> 2) Is there a way to get the Flink cli to output it's result in a json
>> format? Right now I would need to parse the results of the "flink list"
>> command to get the job id, cancel the job with savepoint, parse the results
>> of that to get the savepoint filename, then restore using that. Parsing the
>> output seems brittle to me.
>>
>> Thought?
>> -Steve
>>
>>


Re: Flink CLI

2019-04-25 Thread Marc Rooding
Hi Steven, Oytun

You may find the tool we open-sourced last year useful. It offers deploying and 
updating jobs with savepointing.

You can find it on Github: https://github.com/ing-bank/flink-deployer

There’s also a docker image available in Docker Hub.

Marc
On 24 Apr 2019, 17:29 +0200, Oytun Tez , wrote:
> Hi Steven,
>
> As much as I am aware,
> 1) no update call. our build flow feels a little weird to us as well. 
> definitely requires scripting.
> 2) we are using Flink management API remotely in our build flow to 1) get 
> jobs, 2) savepoint them, 3) cancel them etc. I am going to release a Python 
> script for this soon.
>
> ---
> Oytun Tez
>
> M O T A W O R D
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> > On Wed, Apr 24, 2019 at 11:06 AM Steven Nelson  
> > wrote:
> > > Hello!
> > >
> > > I am working on automating our deployments to our Flink cluster. I had a 
> > > couple questions about the flink cli.
> > >
> > > 1) I thought there was an "update" command that would internally manage 
> > > the cancel with savepoint, upload new jar, restart from savepoint process.
> > >
> > > 2) Is there a way to get the Flink cli to output it's result in a json 
> > > format? Right now I would need to parse the results of the "flink list" 
> > > command to get the job id, cancel the job with savepoint, parse the 
> > > results of that to get the savepoint filename, then restore using that. 
> > > Parsing the output seems brittle to me.
> > >
> > > Thought?
> > > -Steve
> > >


Re: Flink CLI

2019-04-24 Thread Oytun Tez
Hi Steven,

As much as I am aware,
1) no update call. our build flow feels a little weird to us as well.
definitely requires scripting.
2) we are using Flink management API remotely in our build flow to 1) get
jobs, 2) savepoint them, 3) cancel them etc. I am going to release a Python
script for this soon.

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Wed, Apr 24, 2019 at 11:06 AM Steven Nelson 
wrote:

> Hello!
>
> I am working on automating our deployments to our Flink cluster. I had a
> couple questions about the flink cli.
>
> 1) I thought there was an "update" command that would internally manage
> the cancel with savepoint, upload new jar, restart from savepoint process.
>
> 2) Is there a way to get the Flink cli to output it's result in a json
> format? Right now I would need to parse the results of the "flink list"
> command to get the job id, cancel the job with savepoint, parse the results
> of that to get the savepoint filename, then restore using that. Parsing the
> output seems brittle to me.
>
> Thought?
> -Steve
>
>


Re: Flink CLI

2019-04-24 Thread Zack Bartel
Hi Steve,
I recently solved this problem using the REST api and some python scripts. The 
script has a function "upgrade_job" which will cancel with savepoint, 
optionally upload a new jar from the local filestystem or S3, and start the job 
from the savepoint including any changes in parallelism. We've used Jenkins to 
upload new jars to S3 and automated the deployment using saltstack and the 
attached python script.

Please let me know if you find a better way to do this!

Zack 

https://github.com/zackb/code/blob/master/python/dink/dink.py 





Flink CLI

2019-04-24 Thread Steven Nelson
Hello!

I am working on automating our deployments to our Flink cluster. I had a
couple questions about the flink cli.

1) I thought there was an "update" command that would internally manage the
cancel with savepoint, upload new jar, restart from savepoint process.

2) Is there a way to get the Flink cli to output it's result in a json
format? Right now I would need to parse the results of the "flink list"
command to get the job id, cancel the job with savepoint, parse the results
of that to get the savepoint filename, then restore using that. Parsing the
output seems brittle to me.

Thought?
-Steve


Re: Breakage in Flink CLI in 1.5.0

2019-02-28 Thread sen
Hi Till:

So how can we get the right rest address and port when using HA mode
on Yarn?  I get it from the rest api "/jars ". But when I submit a job use
the flink run -m ,it failed .

org.apache.flink.client.program.ProgramInvocationException: Could
not retrieve the execution result.
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:798)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:289)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1035)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:371)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException
... 8 more




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink CLI does not return after submitting yarn job in detached mode

2018-08-16 Thread vino yang
Hi Madhav,

Can you set the log level to DEBUG in the log4j-client configuration file?
Then post the log. I can try to locate it through the log.

Thanks, vino.

makelkar  于2018年8月17日周五 上午1:27写道:

> Hi Vino,
>We should not have to specify class name using -c option to run
> job in detached mode. I tried that this morning but it also didn't work.
>
>    flink CLI always starts in interactive mode, and somehow ignores
> option -yd specified in yarn-cluster mode. Can someone verify this please?
> If its the case, its a bug in flink CLI.
>
>I have an ugly workaround where I start flink CLI in background, and
> I would like to avoid doing that.
>
> Thanks,
> Madhav.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink CLI does not return after submitting yarn job in detached mode

2018-08-16 Thread makelkar
Hi Vino,
   We should not have to specify class name using -c option to run
job in detached mode. I tried that this morning but it also didn't work.

   flink CLI always starts in interactive mode, and somehow ignores
option -yd specified in yarn-cluster mode. Can someone verify this please?
If its the case, its a bug in flink CLI. 

   I have an ugly workaround where I start flink CLI in background, and
I would like to avoid doing that.

Thanks,
Madhav.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink CLI does not return after submitting yarn job in detached mode

2018-08-15 Thread vino yang
Hi Marvin777,

You are wrong. It uses the Flink on YARN single job mode and should use the
"-yd" parameter.

Hi Madhav,

I seem to have found the problem, the source code of your log is here.[1]

It is based on a judgment method "isUsingInteractiveMode".

The source code for this method is here[2], returning true when "program"
is null. And when is this field null? it's here.[3]

So, from the source code point of view, I suggest you explicitly specify
the class in which the Main method is located in the CLI args.



[1]:
https://github.com/apache/flink/blob/release-1.4.2/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L380

[2]:
https://github.com/apache/flink/blob/release-1.4.2/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java#L276

[3]:
https://github.com/apache/flink/blob/release-1.4.2/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java#L217

Thanks, vino.

Marvin777  于2018年8月16日周四 上午11:00写道:

> Hi, Madhav,
>
>
>> ./flink-1.4.2/bin/flink run -m yarn-cluster *-yd* -yn 2 -yqu "default"
>>  -ytm 2048 myjar.jar
>
>
> Modified to, ./flink-1.4.2/bin/flink run -m yarn-cluster -*d* -yn 2 -yqu
> "default"  -ytm 2048 myjar.jar
>
>
>
> [image: image.png]
>
> madhav Kelkar  于2018年8月16日周四 上午5:01写道:
>
>> Hi there,
>>
>> I am trying to run a single flink job on YARN in detached mode. as
>> per the docs for flink 1.4.2, I am using -yd to do that.
>>
>> The problem I am having is the flink bash script doesn't terminate
>> execution and return until I press control + c. In detached mode, I would
>> expect the flink CLI to return as soon as yarn job is submitted. is there
>> something I am missing? here is exact output I get -
>>
>>
>>
>> ./flink-1.4.2/bin/flink run -m yarn-cluster -yd -yn 2 -yqu "default"
>>>  -ytm 2048 myjar.jar \
>>> program arguments omitted
>>>
>>>
>>> Using the result of 'hadoop classpath' to augment the Hadoop classpath:
>>> /Users/makelkar/work/hadoop-2.7.3/etc/hadoop:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/*:/Users/makelkar/work/hadoop-2.7.3/contrib/capacity-scheduler/*.jar
>>> 2018-08-15 14:39:36,873 INFO
>>>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
>>> for the flink jar passed. Using the location of class
>>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>>> 2018-08-15 14:39:36,873 INFO
>>>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
>>> for the flink jar passed. Using the location of class
>>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>>> 2018-08-15 14:39:36,921 INFO  org.apache.hadoop.yarn.client.RMProxy
>>> - Connecting to ResourceManager at /0.0.0.0:8032
>>> 2018-08-15 14:39:37,226 INFO
>>>  org.apache.flink.yarn.YarnClusterDescriptor   - Cluster
>>> specification: ClusterSpecification{masterMemoryMB=1024,
>>> taskManagerMemoryMB=2048, numberTaskManagers=2, slotsPerTaskManager=1}
>>> 2018-08-15 14:39:37,651 WARN
>>>  org.apache.flink.yarn.YarnClusterDescriptor   - The
>>> configuration directory ('/Users/makelkar/work/flink/flink-1.4.2/conf')
>>> contains both LOG4J and Logback configuration files. Please delete or
>>> rename one of them.
>>> 2018-08-15 14:39:37,660 INFO  org.apache.flink.yarn.Utils
>>> - Copying from
>>> file:/Users/makelkar/work/flink/flink-1.4.2/conf/logback.xml to
>>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/logback.xml
>>>
>>> 2018-08-15 14:39:37,986 INFO  org.apache.flink.yarn.Utils
>>> - Copying from
>>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/log4j-1.2.17.jar to
>>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/log4j-1.2.17.jar
>>> 2018-08-15 14:39:38,011 INFO  org.apache.flink.yarn.Utils
>>> - Copying from
>>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar

Re: Flink CLI does not return after submitting yarn job in detached mode

2018-08-15 Thread Marvin777
Hi, Madhav,


> ./flink-1.4.2/bin/flink run -m yarn-cluster *-yd* -yn 2 -yqu "default"
>  -ytm 2048 myjar.jar


Modified to, ./flink-1.4.2/bin/flink run -m yarn-cluster -*d* -yn 2 -yqu
"default"  -ytm 2048 myjar.jar



[image: image.png]

madhav Kelkar  于2018年8月16日周四 上午5:01写道:

> Hi there,
>
> I am trying to run a single flink job on YARN in detached mode. as per
> the docs for flink 1.4.2, I am using -yd to do that.
>
> The problem I am having is the flink bash script doesn't terminate
> execution and return until I press control + c. In detached mode, I would
> expect the flink CLI to return as soon as yarn job is submitted. is there
> something I am missing? here is exact output I get -
>
>
>
> ./flink-1.4.2/bin/flink run -m yarn-cluster -yd -yn 2 -yqu "default"  -ytm
>> 2048 myjar.jar \
>> program arguments omitted
>>
>>
>> Using the result of 'hadoop classpath' to augment the Hadoop classpath:
>> /Users/makelkar/work/hadoop-2.7.3/etc/hadoop:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/*:/Users/makelkar/work/hadoop-2.7.3/contrib/capacity-scheduler/*.jar
>> 2018-08-15 14:39:36,873 INFO
>>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
>> for the flink jar passed. Using the location of class
>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>> 2018-08-15 14:39:36,873 INFO
>>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
>> for the flink jar passed. Using the location of class
>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>> 2018-08-15 14:39:36,921 INFO  org.apache.hadoop.yarn.client.RMProxy
>>   - Connecting to ResourceManager at /0.0.0.0:8032
>> 2018-08-15 14:39:37,226 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>>   - Cluster specification:
>> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048,
>> numberTaskManagers=2, slotsPerTaskManager=1}
>> 2018-08-15 14:39:37,651 WARN  org.apache.flink.yarn.YarnClusterDescriptor
>>   - The configuration directory
>> ('/Users/makelkar/work/flink/flink-1.4.2/conf') contains both LOG4J and
>> Logback configuration files. Please delete or rename one of them.
>> 2018-08-15 14:39:37,660 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/conf/logback.xml to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/logback.xml
>>
>> 2018-08-15 14:39:37,986 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/log4j-1.2.17.jar to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/log4j-1.2.17.jar
>> 2018-08-15 14:39:38,011 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar
>> to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/flink-dist_2.11-1.4.2.jar
>> 2018-08-15 14:39:38,586 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar
>> to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/flink-python_2.11-1.4.2.jar
>> 2018-08-15 14:39:38,603 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/conf/log4j.properties to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/log4j.properties
>>
>> 2018-08-15 14:39:39,002 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar
>> to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/flink-dist_2.11-1.4.2.jar
>> 2018-08-15 14:39:39,401 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> /var/folders/b6/_t_6q0vs3glcggp_8rgyxxl4gn/T/application_1534188161088_0019-flink-conf.yaml8441703337078262150.tmp
>&

Flink CLI does not return after submitting yarn job in detached mode

2018-08-15 Thread madhav Kelkar
Hi there,

I am trying to run a single flink job on YARN in detached mode. as per
the docs for flink 1.4.2, I am using -yd to do that.

The problem I am having is the flink bash script doesn't terminate
execution and return until I press control + c. In detached mode, I would
expect the flink CLI to return as soon as yarn job is submitted. is there
something I am missing? here is exact output I get -



./flink-1.4.2/bin/flink run -m yarn-cluster -yd -yn 2 -yqu "default"  -ytm
> 2048 myjar.jar \
> program arguments omitted
>
>
> Using the result of 'hadoop classpath' to augment the Hadoop classpath:
> /Users/makelkar/work/hadoop-2.7.3/etc/hadoop:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/*:/Users/makelkar/work/hadoop-2.7.3/contrib/capacity-scheduler/*.jar
> 2018-08-15 14:39:36,873 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
> for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-08-15 14:39:36,873 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
> for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-08-15 14:39:36,921 INFO  org.apache.hadoop.yarn.client.RMProxy
>   - Connecting to ResourceManager at /0.0.0.0:8032
> 2018-08-15 14:39:37,226 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   - Cluster specification:
> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048,
> numberTaskManagers=2, slotsPerTaskManager=1}
> 2018-08-15 14:39:37,651 WARN  org.apache.flink.yarn.YarnClusterDescriptor
>   - The configuration directory
> ('/Users/makelkar/work/flink/flink-1.4.2/conf') contains both LOG4J and
> Logback configuration files. Please delete or rename one of them.
> 2018-08-15 14:39:37,660 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/conf/logback.xml to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/logback.xml
>
> 2018-08-15 14:39:37,986 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/lib/log4j-1.2.17.jar to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/log4j-1.2.17.jar
> 2018-08-15 14:39:38,011 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar
> to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/flink-dist_2.11-1.4.2.jar
> 2018-08-15 14:39:38,586 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar
> to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/flink-python_2.11-1.4.2.jar
> 2018-08-15 14:39:38,603 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/conf/log4j.properties to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/log4j.properties
>
> 2018-08-15 14:39:39,002 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar
> to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/flink-dist_2.11-1.4.2.jar
> 2018-08-15 14:39:39,401 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> /var/folders/b6/_t_6q0vs3glcggp_8rgyxxl4gn/T/application_1534188161088_0019-flink-conf.yaml8441703337078262150.tmp
> to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/application_1534188161088_0019-flink-conf.yaml8441703337078262150.tmp
> 2018-08-15 14:39:39,836 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   - Submitting application master
> application_1534188161088_0019
> 2018-08-15 14:39:39,858 INFO
>  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
> application application_1534188161088_0019
> 2018-08-15 14:39:39,858 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   - Waiting for the cluste

Re: How to submit a job with dependency jars by flink cli in Flink 1.4.2?

2018-08-06 Thread Piotr Nowojski
Hi,

I’m glad that you have found a solution to your problem :)

To shorten feedback you can/should test as much logic as possible using smaller 
unit tests and some small scale integration tests: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html 

 . Usually there is no need for starting up full Flink cluster and submitting 
your WIP job during development. You can do such end to end tests only once 
before committing/pushing/merging/deploying. 

Piotrek 

> On 6 Aug 2018, at 10:03, Joshua Fan  wrote:
> 
> Hi Piotr
> 
> Thank you for your kindly suggestion.
> 
> Yes, there was surely a warning when a path like file:// is set. I later set 
> the -yt to a directory, and the jars in the directory was uploaded to TM, but 
> the flink run command failed to submit the job because of 
> ClassNotFoundException.
> 
> I finally realize that flink just want the user to use a fat jar to submit 
> the jar and its dependency but not a dynamic way to upload dependency when 
> submitting.
> 
> It's right when I submit a job in production environment, but in test 
> environment, users may change the business logic many times, they do not want 
> to wait a long time(to make the fat jar using maven,to transfer it to a flink 
> client node, to run it, I have to admit it is a long time.) to test it in 
> flink.
> 
> It seems I have to find a way to shorten the time my users cost.
> 
> Yours Sincerely
> 
> Joshua
> 
> On Fri, Aug 3, 2018 at 9:08 PM, Piotr Nowojski  > wrote:
> Hi,
> 
>  -yt,--yarnship  Ship files in the specified directory
>   (t for transfer)
> 
> I guess that you even got a warning in your log files:
> 
> LOG.warn("Ship directory is not a directory. Ignoring it.”);
> 
> I’m not sure, but maybe with `-yt` you do not even need to specify `-C`, just 
> `-yt /home/work/xxx/lib/` should suffice:
> https://stackoverflow.com/a/47412643/8149051 
> 
> 
> Piotrek
> 
> 
>> On 3 Aug 2018, at 14:41, Joshua Fan > > wrote:
>> 
>> hi Piotr
>> 
>> I give up to use big c to do such a thing. Big c requires the value to be a 
>> java URL, but the java URL only supports  
>> file,ftp,gopher,http,https,jar,mailto,netdoc. That's why I can not do it 
>> with a hdfs location.
>> 
>> For yt option, I think I should do something more.
>> 
>> Yours
>> Joshua
>> 
>> On Fri, Aug 3, 2018 at 8:11 PM, Joshua Fan > > wrote:
>> Hi Piotr
>> 
>> I just tried the yt option, like your suggestion, change -C  
>> file:/home/work/xxx/lib/commons-math3-3.5.jar to -yt  
>> file:/home/work/xxx/lib/commons-math3-3.5.jar, but it even fails to submit, 
>> reporting exception "Caused by: java.lang.ClassNotFoundException: 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)".
>> 
>> big c can submit the job but the job can not run in cluster on yarn, yt just 
>> can not submit.
>> 
>> I am trying to change the  "-C  
>> file:/home/work/xxx/lib/commons-math3-3.5.jar" to  "-C  
>> hdfs://namenode1/home/ <>work/xxx/lib/commons-math3-3.5.jar", but 
>> Clifrontend error was caught.
>> I am still on it now, will report it later.
>> 
>> Yours
>> Joshua
>> 
>> On Fri, Aug 3, 2018 at 7:58 PM, Piotr Nowojski > > wrote:
>> Hi Joshua,
>> 
>> Please try (as Paul suggested) using:
>> 
>>  -yt,--yarnship  Ship files in the specified 
>> directory
>>   (t for transfer)
>> 
>> I guess `-yt /home/work/xxx` should solve your problem :)
>> 
>> Piotrek
>> 
>>> On 3 Aug 2018, at 13:54, Joshua Fan >> > wrote:
>>> 
>>> Hi Piotr
>>> 
>>> Thank you for your advice. I submit the dependency jar from local machine, 
>>> they does not exist in yarn container machine. Maybe I misunderstand the 
>>> option big c, it can not do such a thing.
>>> 
>>> Joshua  
>>> 
>>> On Fri, Aug 3, 2018 at 7:17 PM, Piotr Nowojski >> > wrote:
>>> Hi,
>>> 
>>> Are those paths:
>>> file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
>>> file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' 
>>> (missing)
>>> 
>>> accessible from the inside of your container? 
>>> 
>>> bin/flink run --help
>>> (…)
>>>  -C,--classpath  Adds a URL to each user code
>>>   classloader  on all nodes in the
>>>   cluster. The paths must specify a
>>>   protocol (e.g. file://) and be
>>>   accessible on all nodes (e.g. by 
>>> means
>>>   of a NFS share

Re: How to submit a job with dependency jars by flink cli in Flink 1.4.2?

2018-08-03 Thread Piotr Nowojski
Hi,

 -yt,--yarnship  Ship files in the specified directory
  (t for transfer)

I guess that you even got a warning in your log files:

LOG.warn("Ship directory is not a directory. Ignoring it.”);

I’m not sure, but maybe with `-yt` you do not even need to specify `-C`, just 
`-yt /home/work/xxx/lib/` should suffice:
https://stackoverflow.com/a/47412643/8149051 


Piotrek

> On 3 Aug 2018, at 14:41, Joshua Fan  wrote:
> 
> hi Piotr
> 
> I give up to use big c to do such a thing. Big c requires the value to be a 
> java URL, but the java URL only supports  
> file,ftp,gopher,http,https,jar,mailto,netdoc. That's why I can not do it with 
> a hdfs location.
> 
> For yt option, I think I should do something more.
> 
> Yours
> Joshua
> 
> On Fri, Aug 3, 2018 at 8:11 PM, Joshua Fan  > wrote:
> Hi Piotr
> 
> I just tried the yt option, like your suggestion, change -C  
> file:/home/work/xxx/lib/commons-math3-3.5.jar to -yt  
> file:/home/work/xxx/lib/commons-math3-3.5.jar, but it even fails to submit, 
> reporting exception "Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)".
> 
> big c can submit the job but the job can not run in cluster on yarn, yt just 
> can not submit.
> 
> I am trying to change the  "-C  
> file:/home/work/xxx/lib/commons-math3-3.5.jar" to  "-C  
> hdfs://namenode1/home/work/xxx/lib/commons-math3-3.5.jar", but Clifrontend 
> error was caught.
> I am still on it now, will report it later.
> 
> Yours
> Joshua
> 
> On Fri, Aug 3, 2018 at 7:58 PM, Piotr Nowojski  > wrote:
> Hi Joshua,
> 
> Please try (as Paul suggested) using:
> 
>  -yt,--yarnship  Ship files in the specified 
> directory
>   (t for transfer)
> 
> I guess `-yt /home/work/xxx` should solve your problem :)
> 
> Piotrek
> 
>> On 3 Aug 2018, at 13:54, Joshua Fan > > wrote:
>> 
>> Hi Piotr
>> 
>> Thank you for your advice. I submit the dependency jar from local machine, 
>> they does not exist in yarn container machine. Maybe I misunderstand the 
>> option big c, it can not do such a thing.
>> 
>> Joshua  
>> 
>> On Fri, Aug 3, 2018 at 7:17 PM, Piotr Nowojski > > wrote:
>> Hi,
>> 
>> Are those paths:
>> file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
>> file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' 
>> (missing)
>> 
>> accessible from the inside of your container? 
>> 
>> bin/flink run --help
>> (…)
>>  -C,--classpath  Adds a URL to each user code
>>   classloader  on all nodes in the
>>   cluster. The paths must specify a
>>   protocol (e.g. file://) and be
>>   accessible on all nodes (e.g. by 
>> means
>>   of a NFS share). You can use this
>>   option multiple times for 
>> specifying
>>   more than one URL. The protocol 
>> must
>>   be supported by the {@link
>>   java.net.URLClassLoader}.
>> 
>> Other nit, maybe the problem is with single slash after “file:”. You have 
>> file:/home/...
>> While it might need to be
>> file://home/ <>...
>> 
>> Piotrek
>> 
>>> On 3 Aug 2018, at 13:03, Joshua Fan >> > wrote:
>>> 
>>> Hi,
>>> 
>>> I'd like to submit a job with dependency jars by flink run, but it failed.
>>> 
>>> Here is the script,
>>> 
>>> /usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
>>> -m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
>>> -c StreamExample \
>>> -C file:/home/work/xxx/lib/commons-math3-3.5.jar \
>>> -C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
>>> ...
>>> xxx-1.0.jar
>>> 
>>> As described in 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage
>>>  
>>> 
>>>  , "-C" means to provide the dependency jar.
>>> 
>>> After I execute the command, the job succeed to submit, but can not run in 
>>> flink cluster on yarn. Exceptions is like below:
>>> 
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load 
>>> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
>>> ClassLoader info: URL ClassLoader:
>>> file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
>>> file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' 
>>> (missing)
>>> 

Re: How to submit a job with dependency jars by flink cli in Flink 1.4.2?

2018-08-03 Thread Paul Lam



> 在 2018年8月3日,19:03,Joshua Fan  写道:
> 
> Hi,
> 
> I'd like to submit a job with dependency jars by flink run, but it failed.
> 
> Here is the script,
> 
> /usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
> -m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
> -c StreamExample \
> -C file:/home/work/xxx/lib/commons-math3-3.5.jar \
> -C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
> ...
> xxx-1.0.jar
> 
> As described in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage
>  , "-C" means to provide the dependency jar.
> 
> After I execute the command, the job succeed to submit, but can not run in 
> flink cluster on yarn. Exceptions is like below:
> 
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load 
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
> ClassLoader info: URL ClassLoader:
> file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
> file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' 
> (missing)
> ...
> Class not resolvable through given classloader.
>   at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> 
> It appears that the two dependency jar cannot be found in TaskManager, so I 
> dig into the source code, from CliFrontend to PackagedProgram to 
> ClusterClient to JobGraph. It seems like the dependency jars is put in 
> classpath and userCodeClassLoader in PackagedProgram, but never upload to the 
> BlobServer in JobGraph where the xxx-1.0.jar is uploaded.
> 
> Am I missing something? In Flink 1.4.2, dependency jar is not supported?
> 
> Hope someone can give me some hint.
> 
> Appreciate it very mush.
> 
> 
> Yours Sincerely
> 
> Joshua
> 
> 
> 

Hi Joshua, 

I think what you’re looking for is `-yt` option, which is used for distributing 
a specified directory via YARN to the TaskManager nodes. And you can find its 
description in the Flink client by executing `bin/flink`.

> -yt,--yarnship  Ship files in the specified directory (t 
> for transfer)

Best Regards, 
Paul Lam

Re: How to submit a job with dependency jars by flink cli in Flink 1.4.2?

2018-08-03 Thread Piotr Nowojski
Hi,

Are those paths:
file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' 
(missing)

accessible from the inside of your container? 

bin/flink run --help
(…)
 -C,--classpath  Adds a URL to each user code
  classloader  on all nodes in the
  cluster. The paths must specify a
  protocol (e.g. file://) and be
  accessible on all nodes (e.g. by means
  of a NFS share). You can use this
  option multiple times for specifying
  more than one URL. The protocol must
  be supported by the {@link
  java.net.URLClassLoader}.

Other nit, maybe the problem is with single slash after “file:”. You have 
file:/home/...
While it might need to be
file://home/ ...

Piotrek

> On 3 Aug 2018, at 13:03, Joshua Fan  wrote:
> 
> Hi,
> 
> I'd like to submit a job with dependency jars by flink run, but it failed.
> 
> Here is the script,
> 
> /usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
> -m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
> -c StreamExample \
> -C file:/home/work/xxx/lib/commons-math3-3.5.jar \
> -C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
> ...
> xxx-1.0.jar
> 
> As described in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage
>  
> 
>  , "-C" means to provide the dependency jar.
> 
> After I execute the command, the job succeed to submit, but can not run in 
> flink cluster on yarn. Exceptions is like below:
> 
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load 
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
> ClassLoader info: URL ClassLoader:
> file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
> file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' 
> (missing)
> ...
> Class not resolvable through given classloader.
>   at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> 
> It appears that the two dependency jar cannot be found in TaskManager, so I 
> dig into the source code, from CliFrontend to PackagedProgram to 
> ClusterClient to JobGraph. It seems like the dependency jars is put in 
> classpath and userCodeClassLoader in PackagedProgram, but never upload to the 
> BlobServer in JobGraph where the xxx-1.0.jar is uploaded.
> 
> Am I missing something? In Flink 1.4.2, dependency jar is not supported?
> 
> Hope someone can give me some hint.
> 
> Appreciate it very mush.
> 
> 
> Yours Sincerely
> 
> Joshua
> 
> 
> 
> 



How to submit a job with dependency jars by flink cli in Flink 1.4.2?

2018-08-03 Thread Joshua Fan
Hi,

I'd like to submit a job with dependency jars by flink run, but it failed.

Here is the script,

/usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
-m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
-c StreamExample \
-C file:/home/work/xxx/lib/commons-math3-3.5.jar \
-C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
...
xxx-1.0.jar

As described in
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage
, "-C" means to provide the dependency jar.

After I execute the command, the job succeed to submit, but can not run in
flink cluster on yarn. Exceptions is like below:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
ClassLoader info: URL ClassLoader:
file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar'
(missing)
...
Class not resolvable through given classloader.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

It appears that the two dependency jar cannot be found in TaskManager, so I
dig into the source code, from CliFrontend to PackagedProgram to
ClusterClient to JobGraph. It seems like the dependency jars is put in
classpath and userCodeClassLoader in PackagedProgram, but never upload to
the BlobServer in JobGraph where the xxx-1.0.jar is uploaded.

Am I missing something? In Flink 1.4.2, dependency jar is not supported?

Hope someone can give me some hint.

Appreciate it very mush.


Yours Sincerely

Joshua


Re: Flink CLI properties with HA

2018-07-18 Thread Sampath Bhat
Vino, I'm not getting any error but my suspicion was that if I dont specify
this `high-availability.storageDir` property in flink CLI side then the CLI
will not be able to submit job to flink cluster(HA enabled). But if provide
this property in CLI side the job submission will be successful even though
the CLI cannot access the path mentioned in `high-availability.storageDir`.
So I wanted to understand the underlying implementation.

Till, Thank you for the reply. It clarified my doubt.

On Tue, Jul 17, 2018 at 6:03 PM, Till Rohrmann  wrote:

> Hi Sampath,
>
> technically the client does not need to know the
> `high-availability.storageDir` to submit a job. However, due to how we
> construct the ZooKeeperHaServices it is still needed. The reason behind
> this is that we use the same services for the server and the client. Thus,
> the implementation needs to know the storageDir in both cases. The way it
> should be done is to split the HighAvailabilityServices up into client and
> server services. The former would then not depend on
> `high-availability.storageDir`.
>
> Cheers,
> Till
>
> On Tue, Jul 17, 2018 at 1:31 PM vino yang  wrote:
>
>> Hi Sampath,
>>
>> It seems Flink CLI for standalone would not access
>> *high-availability.storageDir.*
>>
>> What's the exception stack trace in your environment?
>>
>> Thanks, vino.
>>
>> 2018-07-17 15:08 GMT+08:00 Sampath Bhat :
>>
>>> Hi vino
>>>
>>> Should the flink CLI have access to the path mentioned in
>>> *high-availability.storageDir*?
>>> If my flink cluster is on set of machines and i submit my job from flink
>>> CLI from another independent machine by giving necessary details will the
>>> CLI try to access *high-availability.storageDir *path?
>>>
>>> I'm aware of the fact that flink client will connect to zookeeper to get
>>> leader address and necessary information for job submission but my
>>> confusion is with *high-availability.storageDir* and its necessity in
>>> flink CLI configuration.
>>>
>>> On Mon, Jul 16, 2018 at 2:44 PM, vino yang 
>>> wrote:
>>>
>>>> Hi Sampath,
>>>>
>>>> Flink CLI need to retrieve the JobManager leader address, so it need
>>>> to access the HA specific configuration. Because if based on Zookeeper to
>>>> implement the HA, the leader address information will fetch from Zookeeper.
>>>>
>>>> The main use of config item *high-availability.storageDir* is storage
>>>> (Job graph, checkpoint and so on). Actually, the real data is stored under
>>>> this path which used to recover purpose, zookeeper just store a state
>>>> handle.
>>>>
>>>> ---
>>>> Thanks.
>>>> vino.
>>>>
>>>>
>>>> 2018-07-16 15:28 GMT+08:00 Sampath Bhat :
>>>>
>>>>>
>>>>> -- Forwarded message --
>>>>> From: Sampath Bhat 
>>>>> Date: Fri, Jul 13, 2018 at 3:18 PM
>>>>> Subject: Flink CLI properties with HA
>>>>> To: user 
>>>>>
>>>>>
>>>>> Hello
>>>>>
>>>>> When HA is enabled in the flink cluster and if I've to submit job via
>>>>> flink CLI then in the flink-conf.yaml of flink CLI should contain this
>>>>> properties -
>>>>> high-availability: zookeeper
>>>>> high-availability.cluster-id: flink
>>>>> high-availability.zookeeper.path.root: flink
>>>>> high-availability.storageDir: 
>>>>> high-availability.zookeeper.quorum: 
>>>>>
>>>>> What is the need of high-availability.storageDir for flink CLI. Does
>>>>> this mean that even flink client should be able to access the mentioned
>>>>> path or is it some check being done on the property name?
>>>>>
>>>>> Without these properties flink cli will not be able to submit job to
>>>>> flink cluster when HA is enabled.
>>>>>
>>>>>
>>>>
>>>
>>


Re: Flink CLI properties with HA

2018-07-17 Thread Till Rohrmann
Hi Sampath,

technically the client does not need to know the
`high-availability.storageDir` to submit a job. However, due to how we
construct the ZooKeeperHaServices it is still needed. The reason behind
this is that we use the same services for the server and the client. Thus,
the implementation needs to know the storageDir in both cases. The way it
should be done is to split the HighAvailabilityServices up into client and
server services. The former would then not depend on
`high-availability.storageDir`.

Cheers,
Till

On Tue, Jul 17, 2018 at 1:31 PM vino yang  wrote:

> Hi Sampath,
>
> It seems Flink CLI for standalone would not access
> *high-availability.storageDir.*
>
> What's the exception stack trace in your environment?
>
> Thanks, vino.
>
> 2018-07-17 15:08 GMT+08:00 Sampath Bhat :
>
>> Hi vino
>>
>> Should the flink CLI have access to the path mentioned in
>> *high-availability.storageDir*?
>> If my flink cluster is on set of machines and i submit my job from flink
>> CLI from another independent machine by giving necessary details will the
>> CLI try to access *high-availability.storageDir *path?
>>
>> I'm aware of the fact that flink client will connect to zookeeper to get
>> leader address and necessary information for job submission but my
>> confusion is with *high-availability.storageDir* and its necessity in
>> flink CLI configuration.
>>
>> On Mon, Jul 16, 2018 at 2:44 PM, vino yang  wrote:
>>
>>> Hi Sampath,
>>>
>>> Flink CLI need to retrieve the JobManager leader address, so it need  to
>>> access the HA specific configuration. Because if based on Zookeeper to
>>> implement the HA, the leader address information will fetch from Zookeeper.
>>>
>>> The main use of config item *high-availability.storageDir* is storage
>>> (Job graph, checkpoint and so on). Actually, the real data is stored under
>>> this path which used to recover purpose, zookeeper just store a state
>>> handle.
>>>
>>> ---
>>> Thanks.
>>> vino.
>>>
>>>
>>> 2018-07-16 15:28 GMT+08:00 Sampath Bhat :
>>>
>>>>
>>>> -- Forwarded message --
>>>> From: Sampath Bhat 
>>>> Date: Fri, Jul 13, 2018 at 3:18 PM
>>>> Subject: Flink CLI properties with HA
>>>> To: user 
>>>>
>>>>
>>>> Hello
>>>>
>>>> When HA is enabled in the flink cluster and if I've to submit job via
>>>> flink CLI then in the flink-conf.yaml of flink CLI should contain this
>>>> properties -
>>>> high-availability: zookeeper
>>>> high-availability.cluster-id: flink
>>>> high-availability.zookeeper.path.root: flink
>>>> high-availability.storageDir: 
>>>> high-availability.zookeeper.quorum: 
>>>>
>>>> What is the need of high-availability.storageDir for flink CLI. Does
>>>> this mean that even flink client should be able to access the mentioned
>>>> path or is it some check being done on the property name?
>>>>
>>>> Without these properties flink cli will not be able to submit job to
>>>> flink cluster when HA is enabled.
>>>>
>>>>
>>>
>>
>


Re: Flink CLI properties with HA

2018-07-17 Thread vino yang
Hi Sampath,

It seems Flink CLI for standalone would not access
*high-availability.storageDir.*

What's the exception stack trace in your environment?

Thanks, vino.

2018-07-17 15:08 GMT+08:00 Sampath Bhat :

> Hi vino
>
> Should the flink CLI have access to the path mentioned in
> *high-availability.storageDir*?
> If my flink cluster is on set of machines and i submit my job from flink
> CLI from another independent machine by giving necessary details will the
> CLI try to access *high-availability.storageDir *path?
>
> I'm aware of the fact that flink client will connect to zookeeper to get
> leader address and necessary information for job submission but my
> confusion is with *high-availability.storageDir* and its necessity in
> flink CLI configuration.
>
> On Mon, Jul 16, 2018 at 2:44 PM, vino yang  wrote:
>
>> Hi Sampath,
>>
>> Flink CLI need to retrieve the JobManager leader address, so it need  to
>> access the HA specific configuration. Because if based on Zookeeper to
>> implement the HA, the leader address information will fetch from Zookeeper.
>>
>> The main use of config item *high-availability.storageDir* is storage
>> (Job graph, checkpoint and so on). Actually, the real data is stored under
>> this path which used to recover purpose, zookeeper just store a state
>> handle.
>>
>> ---
>> Thanks.
>> vino.
>>
>>
>> 2018-07-16 15:28 GMT+08:00 Sampath Bhat :
>>
>>>
>>> -- Forwarded message --
>>> From: Sampath Bhat 
>>> Date: Fri, Jul 13, 2018 at 3:18 PM
>>> Subject: Flink CLI properties with HA
>>> To: user 
>>>
>>>
>>> Hello
>>>
>>> When HA is enabled in the flink cluster and if I've to submit job via
>>> flink CLI then in the flink-conf.yaml of flink CLI should contain this
>>> properties -
>>> high-availability: zookeeper
>>> high-availability.cluster-id: flink
>>> high-availability.zookeeper.path.root: flink
>>> high-availability.storageDir: 
>>> high-availability.zookeeper.quorum: 
>>>
>>> What is the need of high-availability.storageDir for flink CLI. Does
>>> this mean that even flink client should be able to access the mentioned
>>> path or is it some check being done on the property name?
>>>
>>> Without these properties flink cli will not be able to submit job to
>>> flink cluster when HA is enabled.
>>>
>>>
>>
>


Re: Flink CLI properties with HA

2018-07-17 Thread Sampath Bhat
Hi vino

Should the flink CLI have access to the path mentioned in
*high-availability.storageDir*?
If my flink cluster is on set of machines and i submit my job from flink
CLI from another independent machine by giving necessary details will the
CLI try to access *high-availability.storageDir *path?

I'm aware of the fact that flink client will connect to zookeeper to get
leader address and necessary information for job submission but my
confusion is with *high-availability.storageDir* and its necessity in flink
CLI configuration.

On Mon, Jul 16, 2018 at 2:44 PM, vino yang  wrote:

> Hi Sampath,
>
> Flink CLI need to retrieve the JobManager leader address, so it need  to
> access the HA specific configuration. Because if based on Zookeeper to
> implement the HA, the leader address information will fetch from Zookeeper.
>
> The main use of config item *high-availability.storageDir* is storage
> (Job graph, checkpoint and so on). Actually, the real data is stored under
> this path which used to recover purpose, zookeeper just store a state
> handle.
>
> ---
> Thanks.
> vino.
>
>
> 2018-07-16 15:28 GMT+08:00 Sampath Bhat :
>
>>
>> -- Forwarded message --
>> From: Sampath Bhat 
>> Date: Fri, Jul 13, 2018 at 3:18 PM
>> Subject: Flink CLI properties with HA
>> To: user 
>>
>>
>> Hello
>>
>> When HA is enabled in the flink cluster and if I've to submit job via
>> flink CLI then in the flink-conf.yaml of flink CLI should contain this
>> properties -
>> high-availability: zookeeper
>> high-availability.cluster-id: flink
>> high-availability.zookeeper.path.root: flink
>> high-availability.storageDir: 
>> high-availability.zookeeper.quorum: 
>>
>> What is the need of high-availability.storageDir for flink CLI. Does this
>> mean that even flink client should be able to access the mentioned path or
>> is it some check being done on the property name?
>>
>> Without these properties flink cli will not be able to submit job to
>> flink cluster when HA is enabled.
>>
>>
>


Re: Flink CLI properties with HA

2018-07-16 Thread vino yang
Hi Sampath,

Flink CLI need to retrieve the JobManager leader address, so it need  to
access the HA specific configuration. Because if based on Zookeeper to
implement the HA, the leader address information will fetch from Zookeeper.

The main use of config item *high-availability.storageDir* is storage (Job
graph, checkpoint and so on). Actually, the real data is stored under this
path which used to recover purpose, zookeeper just store a state handle.

---
Thanks.
vino.


2018-07-16 15:28 GMT+08:00 Sampath Bhat :

>
> -- Forwarded message --
> From: Sampath Bhat 
> Date: Fri, Jul 13, 2018 at 3:18 PM
> Subject: Flink CLI properties with HA
> To: user 
>
>
> Hello
>
> When HA is enabled in the flink cluster and if I've to submit job via
> flink CLI then in the flink-conf.yaml of flink CLI should contain this
> properties -
> high-availability: zookeeper
> high-availability.cluster-id: flink
> high-availability.zookeeper.path.root: flink
> high-availability.storageDir: 
> high-availability.zookeeper.quorum: 
>
> What is the need of high-availability.storageDir for flink CLI. Does this
> mean that even flink client should be able to access the mentioned path or
> is it some check being done on the property name?
>
> Without these properties flink cli will not be able to submit job to flink
> cluster when HA is enabled.
>
>


Fwd: Flink CLI properties with HA

2018-07-16 Thread Sampath Bhat
-- Forwarded message --
From: Sampath Bhat 
Date: Fri, Jul 13, 2018 at 3:18 PM
Subject: Flink CLI properties with HA
To: user 


Hello

When HA is enabled in the flink cluster and if I've to submit job via flink
CLI then in the flink-conf.yaml of flink CLI should contain this properties
-
high-availability: zookeeper
high-availability.cluster-id: flink
high-availability.zookeeper.path.root: flink
high-availability.storageDir: 
high-availability.zookeeper.quorum: 

What is the need of high-availability.storageDir for flink CLI. Does this
mean that even flink client should be able to access the mentioned path or
is it some check being done on the property name?

Without these properties flink cli will not be able to submit job to flink
cluster when HA is enabled.


Flink CLI properties with HA

2018-07-13 Thread Sampath Bhat
Hello

When HA is enabled in the flink cluster and if I've to submit job via flink
CLI then in the flink-conf.yaml of flink CLI should contain this properties
-
high-availability: zookeeper
high-availability.cluster-id: flink
high-availability.zookeeper.path.root: flink
high-availability.storageDir: 
high-availability.zookeeper.quorum: 

What is the need of high-availability.storageDir for flink CLI. Does this
mean that even flink client should be able to access the mentioned path or
is it some check being done on the property name?

Without these properties flink cli will not be able to submit job to flink
cluster when HA is enabled.


Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Till Rohrmann
> I'm not quite sure whether the rpc address is actually required for
>>> the
>>> >>> REST job submission, or only since we still rely partly on some
>>> legacy
>>> >>> code (ClusterClient). Maybe Till (cc) knows the answer to that.
>>> >>>
>>> >>> > Adding on to this point you made - " the rpc address is still
>>> >>> *required *due
>>> >>> > to some technical implementations; it may be that you can set this
>>> to
>>> >>> some
>>> >>> > arbitrary value however."
>>> >>> >
>>> >>> > For job submission to happen successfully we should give specific
>>> rpc
>>> >>> > address and not any arbitrary value. If any arbitrary value is
>>> given
>>> >>> the
>>> >>> > job submission fails with the following error -
>>> >>> > org.apache.flink.client.deployment.ClusterRetrieveException:
>>> Couldn't
>>> >>> > retrieve standalone cluster
>>> >>> >  at
>>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>>> >>> retrieve(StandaloneClusterDescriptor.java:51)
>>> >>> >  at
>>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>>> >>> retrieve(StandaloneClusterDescriptor.java:31)
>>> >>> >  at
>>> >>> > org.apache.flink.client.cli.CliFrontend.runProgram(
>>> >>> CliFrontend.java:249)
>>> >>> >  at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.
>>> >>> java:210)
>>> >>> >  at
>>> >>> > org.apache.flink.client.cli.CliFrontend.parseParameters(
>>> >>> CliFrontend.java:1020)
>>> >>> >  at
>>> >>> > org.apache.flink.client.cli.CliFrontend.lambda$main$9(
>>> >>> CliFrontend.java:1096)
>>> >>> >  at
>>> >>> > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(
>>> >>> NoOpSecurityContext.java:30)
>>> >>> >  at
>>> >>> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
>>> >>> > Caused by: java.net.UnknownHostException: flinktest-flink-
>>> >>> jobmanager1233445:
>>> >>> > Name or service not known
>>> >>> >   (Random name flinktest-flink-jobmanager1233445)
>>> >>> >  at java.net.Inet6AddressImpl.lookupAllHostAddr(Native
>>> Method)
>>> >>> >  at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.
>>> >>> java:928)
>>> >>> >  at
>>> >>> > java.net.InetAddress.getAddressesFromNameService(
>>> >>> InetAddress.java:1323)
>>> >>> >  at
>>> java.net.InetAddress.getAllByName0(InetAddress.java:1276)
>>> >>> >  at
>>> java.net.InetAddress.getAllByName(InetAddress.java:1192)
>>> >>> >  at
>>> java.net.InetAddress.getAllByName(InetAddress.java:1126)
>>> >>> >  at java.net.InetAddress.getByName(InetAddress.java:1076)
>>> >>> >  at
>>> >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.
>>> >>> getRpcUrl(AkkaRpcServiceUtils.java:171)
>>> >>> >  at
>>> >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.
>>> >>> getRpcUrl(AkkaRpcServiceUtils.java:136)
>>> >>> >  at
>>> >>> > org.apache.flink.runtime.highavailability.
>>> >>> HighAvailabilityServicesUtils.createHighAvailabilityServices(
>>> >>> HighAvailabilityServicesUtils.java:83)
>>> >>> >  at
>>> >>> > org.apache.flink.client.program.ClusterClient.(
>>> >>> ClusterClient.java:158)
>>> >>> >  at
>>> >>> > org.apache.flink.client.program.rest.RestClusterClient.(
>>> >>> RestClusterClient.java:184)
>>> >>> >  at
>>> >>> > org.apache.flink.client.program.rest.RestClusterClient.(
>>> >>> RestClusterClient.java:157)
>>> >>> >  at
>>> >>> > org.apach

Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Sampath Bhat
org.apache.flink.client.deployment.ClusterRetrieveException:
>> Couldn't
>> >>> > retrieve standalone cluster
>> >>> >  at
>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>> >>> retrieve(StandaloneClusterDescriptor.java:51)
>> >>> >  at
>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>> >>> retrieve(StandaloneClusterDescriptor.java:31)
>> >>> >  at
>> >>> > org.apache.flink.client.cli.CliFrontend.runProgram(
>> >>> CliFrontend.java:249)
>> >>> >  at org.apache.flink.client.cli.
>> CliFrontend.run(CliFrontend.
>> >>> java:210)
>> >>> >  at
>> >>> > org.apache.flink.client.cli.CliFrontend.parseParameters(
>> >>> CliFrontend.java:1020)
>> >>> >  at
>> >>> > org.apache.flink.client.cli.CliFrontend.lambda$main$9(
>> >>> CliFrontend.java:1096)
>> >>> >  at
>> >>> > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(
>> >>> NoOpSecurityContext.java:30)
>> >>> >  at
>> >>> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
>> >>> > Caused by: java.net.UnknownHostException: flinktest-flink-
>> >>> jobmanager1233445:
>> >>> > Name or service not known
>> >>> >   (Random name flinktest-flink-jobmanager1233445)
>> >>> >  at java.net.Inet6AddressImpl.lookupAllHostAddr(Native
>> Method)
>> >>> >  at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.
>> >>> java:928)
>> >>> >  at
>> >>> > java.net.InetAddress.getAddressesFromNameService(
>> >>> InetAddress.java:1323)
>> >>> >  at java.net.InetAddress.getAllByName0(InetAddress.
>> java:1276)
>> >>> >  at java.net.InetAddress.getAllByName(InetAddress.java:
>> 1192)
>> >>> >  at java.net.InetAddress.getAllByName(InetAddress.java:
>> 1126)
>> >>> >  at java.net.InetAddress.getByName(InetAddress.java:1076)
>> >>> >  at
>> >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.
>> >>> getRpcUrl(AkkaRpcServiceUtils.java:171)
>> >>> >  at
>> >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.
>> >>> getRpcUrl(AkkaRpcServiceUtils.java:136)
>> >>> >  at
>> >>> > org.apache.flink.runtime.highavailability.
>> >>> HighAvailabilityServicesUtils.createHighAvailabilityServices(
>> >>> HighAvailabilityServicesUtils.java:83)
>> >>> >  at
>> >>> > org.apache.flink.client.program.ClusterClient.(
>> >>> ClusterClient.java:158)
>> >>> >  at
>> >>> > org.apache.flink.client.program.rest.RestClusterClient.(
>> >>> RestClusterClient.java:184)
>> >>> >  at
>> >>> > org.apache.flink.client.program.rest.RestClusterClient.(
>> >>> RestClusterClient.java:157)
>> >>> >  at
>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>> >>> retrieve(StandaloneClusterDescriptor.java:49)
>> >>> >  ... 7 more
>> >>> >
>> >>> >
>> >>> > On Wed, Jun 20, 2018 at 11:18 AM, Sampath Bhat <
>> >>> sam414255p...@gmail.com>
>> >>> > wrote:
>> >>> >
>> >>> >> Hi Chesnay
>> >>> >>
>> >>> >> If REST API (i.e. the web server) is mandatory for submitting jobs
>> >>> then
>> >>> >> why is there an option to set rest.port to -1? I think it should be
>> >>> >> mandatory to set some valid port for rest.port and make sure flink
>> job
>> >>> >> manager does not come up if valid port is not set for rest.port? Or
>> >>> else
>> >>> >> there must be some way to submit jobs even if REST API (i.e. the
>> web
>> >>> >> server) is not instantiated.
>> >>> >>
>> >>> >> If jobmanger.rpc.address is not required for flink client then why
>> is
>> >>> it
>> >>> >> still looking for that property in flink-conf.yaml? Isn't it not a
>> >>> bug?
>> >>> >> Because if we comment out the jobmanger.rpc.address and
>> >>> jobmanger.rpc.port
>> >>> >> then flink client will not be able to submit the job.
>> >>> >>
>> >>> >>
>> >>> >> On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler <
>> ches...@apache.org
>> >>> >
>> >>> >> wrote:
>> >>> >>
>> >>> >>> In 1.5 we reworked the job-submission to go through the REST API
>> >>> instead
>> >>> >>> of akka.
>> >>> >>>
>> >>> >>> I believe the jobmanager rpc port shouldn't be necessary anymore,
>> >>> the rpc
>> >>> >>> address is still *required *due to some technical
>> implementations; it
>> >>> >>> may be that you can set this to some arbitrary value however.
>> >>> >>>
>> >>> >>> As a result the REST API (i.e. the web server) must be running in
>> >>> order
>> >>> >>> to submit jobs.
>> >>> >>>
>> >>> >>>
>> >>> >>> On 19.06.2018 14:12, Sampath Bhat wrote:
>> >>> >>>
>> >>> >>> Hello
>> >>> >>>
>> >>> >>> I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink
>> >>> >>> cluster.
>> >>> >>>
>> >>> >>> In flink 1.4.2 only job manager rpc address and job manager rpc
>> port
>> >>> were
>> >>> >>> sufficient for flink client to connect to job manager and submit
>> the
>> >>> job.
>> >>> >>>
>> >>> >>> But in flink 1.5.0 the flink client additionally requires the
>> >>> >>> rest.address and rest.port for submitting the job to job manager.
>> >>> What is
>> >>> >>> the advantage of this new method over the 1.4.2 method of
>> submitting
>> >>> job?
>> >>> >>>
>> >>> >>> Moreover if we make rest.port = -1 the web server will not be
>> >>> >>> instantiated then how should we submit the job?
>> >>> >>>
>> >>> >>> Regards
>> >>> >>> Sampath
>> >>> >>>
>> >>> >>>
>> >>> >>>
>> >>>
>> >>>
>> >>
>>
>


Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Till Rohrmann
>>> >  at
> >>> > org.apache.flink.client.cli.CliFrontend.parseParameters(
> >>> CliFrontend.java:1020)
> >>> >  at
> >>> > org.apache.flink.client.cli.CliFrontend.lambda$main$9(
> >>> CliFrontend.java:1096)
> >>> >  at
> >>> > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(
> >>> NoOpSecurityContext.java:30)
> >>> >  at
> >>> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
> >>> > Caused by: java.net.UnknownHostException: flinktest-flink-
> >>> jobmanager1233445:
> >>> > Name or service not known
> >>> >   (Random name flinktest-flink-jobmanager1233445)
> >>> >  at java.net.Inet6AddressImpl.lookupAllHostAddr(Native
> Method)
> >>> >  at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.
> >>> java:928)
> >>> >  at
> >>> > java.net.InetAddress.getAddressesFromNameService(
> >>> InetAddress.java:1323)
> >>> >  at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
> >>> >  at java.net.InetAddress.getAllByName(InetAddress.java:1192)
> >>> >  at java.net.InetAddress.getAllByName(InetAddress.java:1126)
> >>> >  at java.net.InetAddress.getByName(InetAddress.java:1076)
> >>> >  at
> >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.
> >>> getRpcUrl(AkkaRpcServiceUtils.java:171)
> >>> >  at
> >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.
> >>> getRpcUrl(AkkaRpcServiceUtils.java:136)
> >>> >  at
> >>> > org.apache.flink.runtime.highavailability.
> >>> HighAvailabilityServicesUtils.createHighAvailabilityServices(
> >>> HighAvailabilityServicesUtils.java:83)
> >>> >  at
> >>> > org.apache.flink.client.program.ClusterClient.(
> >>> ClusterClient.java:158)
> >>> >  at
> >>> > org.apache.flink.client.program.rest.RestClusterClient.(
> >>> RestClusterClient.java:184)
> >>> >  at
> >>> > org.apache.flink.client.program.rest.RestClusterClient.(
> >>> RestClusterClient.java:157)
> >>> >  at
> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
> >>> retrieve(StandaloneClusterDescriptor.java:49)
> >>> >  ... 7 more
> >>> >
> >>> >
> >>> > On Wed, Jun 20, 2018 at 11:18 AM, Sampath Bhat <
> >>> sam414255p...@gmail.com>
> >>> > wrote:
> >>> >
> >>> >> Hi Chesnay
> >>> >>
> >>> >> If REST API (i.e. the web server) is mandatory for submitting jobs
> >>> then
> >>> >> why is there an option to set rest.port to -1? I think it should be
> >>> >> mandatory to set some valid port for rest.port and make sure flink
> job
> >>> >> manager does not come up if valid port is not set for rest.port? Or
> >>> else
> >>> >> there must be some way to submit jobs even if REST API (i.e. the web
> >>> >> server) is not instantiated.
> >>> >>
> >>> >> If jobmanger.rpc.address is not required for flink client then why
> is
> >>> it
> >>> >> still looking for that property in flink-conf.yaml? Isn't it not a
> >>> bug?
> >>> >> Because if we comment out the jobmanger.rpc.address and
> >>> jobmanger.rpc.port
> >>> >> then flink client will not be able to submit the job.
> >>> >>
> >>> >>
> >>> >> On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler <
> ches...@apache.org
> >>> >
> >>> >> wrote:
> >>> >>
> >>> >>> In 1.5 we reworked the job-submission to go through the REST API
> >>> instead
> >>> >>> of akka.
> >>> >>>
> >>> >>> I believe the jobmanager rpc port shouldn't be necessary anymore,
> >>> the rpc
> >>> >>> address is still *required *due to some technical implementations;
> it
> >>> >>> may be that you can set this to some arbitrary value however.
> >>> >>>
> >>> >>> As a result the REST API (i.e. the web server) must be running in
> >>> order
> >>> >>> to submit jobs.
> >>> >>>
> >>> >>>
> >>> >>> On 19.06.2018 14:12, Sampath Bhat wrote:
> >>> >>>
> >>> >>> Hello
> >>> >>>
> >>> >>> I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink
> >>> >>> cluster.
> >>> >>>
> >>> >>> In flink 1.4.2 only job manager rpc address and job manager rpc
> port
> >>> were
> >>> >>> sufficient for flink client to connect to job manager and submit
> the
> >>> job.
> >>> >>>
> >>> >>> But in flink 1.5.0 the flink client additionally requires the
> >>> >>> rest.address and rest.port for submitting the job to job manager.
> >>> What is
> >>> >>> the advantage of this new method over the 1.4.2 method of
> submitting
> >>> job?
> >>> >>>
> >>> >>> Moreover if we make rest.port = -1 the web server will not be
> >>> >>> instantiated then how should we submit the job?
> >>> >>>
> >>> >>> Regards
> >>> >>> Sampath
> >>> >>>
> >>> >>>
> >>> >>>
> >>>
> >>>
> >>
>


Re: Breakage in Flink CLI in 1.5.0

2018-06-20 Thread Sampath Bhat
 at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.
>>> java:928)
>>> >  at
>>> > java.net.InetAddress.getAddressesFromNameService(
>>> InetAddress.java:1323)
>>> >  at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
>>> >  at java.net.InetAddress.getAllByName(InetAddress.java:1192)
>>> >  at java.net.InetAddress.getAllByName(InetAddress.java:1126)
>>> >  at java.net.InetAddress.getByName(InetAddress.java:1076)
>>> >  at
>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.
>>> getRpcUrl(AkkaRpcServiceUtils.java:171)
>>> >  at
>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.
>>> getRpcUrl(AkkaRpcServiceUtils.java:136)
>>> >  at
>>> > org.apache.flink.runtime.highavailability.
>>> HighAvailabilityServicesUtils.createHighAvailabilityServices(
>>> HighAvailabilityServicesUtils.java:83)
>>> >  at
>>> > org.apache.flink.client.program.ClusterClient.(
>>> ClusterClient.java:158)
>>> >  at
>>> > org.apache.flink.client.program.rest.RestClusterClient.(
>>> RestClusterClient.java:184)
>>> >  at
>>> > org.apache.flink.client.program.rest.RestClusterClient.(
>>> RestClusterClient.java:157)
>>> >  at
>>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>>> retrieve(StandaloneClusterDescriptor.java:49)
>>> >  ... 7 more
>>> >
>>> >
>>> > On Wed, Jun 20, 2018 at 11:18 AM, Sampath Bhat <
>>> sam414255p...@gmail.com>
>>> > wrote:
>>> >
>>> >> Hi Chesnay
>>> >>
>>> >> If REST API (i.e. the web server) is mandatory for submitting jobs
>>> then
>>> >> why is there an option to set rest.port to -1? I think it should be
>>> >> mandatory to set some valid port for rest.port and make sure flink job
>>> >> manager does not come up if valid port is not set for rest.port? Or
>>> else
>>> >> there must be some way to submit jobs even if REST API (i.e. the web
>>> >> server) is not instantiated.
>>> >>
>>> >> If jobmanger.rpc.address is not required for flink client then why is
>>> it
>>> >> still looking for that property in flink-conf.yaml? Isn't it not a
>>> bug?
>>> >> Because if we comment out the jobmanger.rpc.address and
>>> jobmanger.rpc.port
>>> >> then flink client will not be able to submit the job.
>>> >>
>>> >>
>>> >> On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler >> >
>>> >> wrote:
>>> >>
>>> >>> In 1.5 we reworked the job-submission to go through the REST API
>>> instead
>>> >>> of akka.
>>> >>>
>>> >>> I believe the jobmanager rpc port shouldn't be necessary anymore,
>>> the rpc
>>> >>> address is still *required *due to some technical implementations; it
>>> >>> may be that you can set this to some arbitrary value however.
>>> >>>
>>> >>> As a result the REST API (i.e. the web server) must be running in
>>> order
>>> >>> to submit jobs.
>>> >>>
>>> >>>
>>> >>> On 19.06.2018 14:12, Sampath Bhat wrote:
>>> >>>
>>> >>> Hello
>>> >>>
>>> >>> I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink
>>> >>> cluster.
>>> >>>
>>> >>> In flink 1.4.2 only job manager rpc address and job manager rpc port
>>> were
>>> >>> sufficient for flink client to connect to job manager and submit the
>>> job.
>>> >>>
>>> >>> But in flink 1.5.0 the flink client additionally requires the
>>> >>> rest.address and rest.port for submitting the job to job manager.
>>> What is
>>> >>> the advantage of this new method over the 1.4.2 method of submitting
>>> job?
>>> >>>
>>> >>> Moreover if we make rest.port = -1 the web server will not be
>>> >>> instantiated then how should we submit the job?
>>> >>>
>>> >>> Regards
>>> >>> Sampath
>>> >>>
>>> >>>
>>> >>>
>>>
>>>
>>


Re: Breakage in Flink CLI in 1.5.0

2018-06-20 Thread Till Rohrmann
.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:49)
>> >  ... 7 more
>> >
>> >
>> > On Wed, Jun 20, 2018 at 11:18 AM, Sampath Bhat > >
>> > wrote:
>> >
>> >> Hi Chesnay
>> >>
>> >> If REST API (i.e. the web server) is mandatory for submitting jobs then
>> >> why is there an option to set rest.port to -1? I think it should be
>> >> mandatory to set some valid port for rest.port and make sure flink job
>> >> manager does not come up if valid port is not set for rest.port? Or
>> else
>> >> there must be some way to submit jobs even if REST API (i.e. the web
>> >> server) is not instantiated.
>> >>
>> >> If jobmanger.rpc.address is not required for flink client then why is
>> it
>> >> still looking for that property in flink-conf.yaml? Isn't it not a bug?
>> >> Because if we comment out the jobmanger.rpc.address and
>> jobmanger.rpc.port
>> >> then flink client will not be able to submit the job.
>> >>
>> >>
>> >> On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler 
>> >> wrote:
>> >>
>> >>> In 1.5 we reworked the job-submission to go through the REST API
>> instead
>> >>> of akka.
>> >>>
>> >>> I believe the jobmanager rpc port shouldn't be necessary anymore, the
>> rpc
>> >>> address is still *required *due to some technical implementations; it
>> >>> may be that you can set this to some arbitrary value however.
>> >>>
>> >>> As a result the REST API (i.e. the web server) must be running in
>> order
>> >>> to submit jobs.
>> >>>
>> >>>
>> >>> On 19.06.2018 14:12, Sampath Bhat wrote:
>> >>>
>> >>> Hello
>> >>>
>> >>> I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink
>> >>> cluster.
>> >>>
>> >>> In flink 1.4.2 only job manager rpc address and job manager rpc port
>> were
>> >>> sufficient for flink client to connect to job manager and submit the
>> job.
>> >>>
>> >>> But in flink 1.5.0 the flink client additionally requires the
>> >>> rest.address and rest.port for submitting the job to job manager.
>> What is
>> >>> the advantage of this new method over the 1.4.2 method of submitting
>> job?
>> >>>
>> >>> Moreover if we make rest.port = -1 the web server will not be
>> >>> instantiated then how should we submit the job?
>> >>>
>> >>> Regards
>> >>> Sampath
>> >>>
>> >>>
>> >>>
>>
>>
>


Re: Breakage in Flink CLI in 1.5.0

2018-06-20 Thread Chesnay Schepler
 and make sure
flink job
>> manager does not come up if valid port is not set for
rest.port? Or else
>> there must be some way to submit jobs even if REST API (i.e.
the web
>> server) is not instantiated.
>>
>> If jobmanger.rpc.address is not required for flink client then
why is it
>> still looking for that property in flink-conf.yaml? Isn't it
not a bug?
>> Because if we comment out the jobmanger.rpc.address and
jobmanger.rpc.port
>> then flink client will not be able to submit the job.
>>
>>
>> On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler
mailto:ches...@apache.org>>
>> wrote:
>>
>>> In 1.5 we reworked the job-submission to go through the REST
API instead
>>> of akka.
>>>
>>> I believe the jobmanager rpc port shouldn't be necessary
anymore, the rpc
>>> address is still *required *due to some technical
implementations; it
>>> may be that you can set this to some arbitrary value however.
>>>
>>> As a result the REST API (i.e. the web server) must be running
in order
>>> to submit jobs.
>>>
>>>
>>> On 19.06.2018 14:12, Sampath Bhat wrote:
>>>
>>> Hello
>>>
>>> I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink
>>> cluster.
>>>
>>> In flink 1.4.2 only job manager rpc address and job manager
rpc port were
>>> sufficient for flink client to connect to job manager and
submit the job.
>>>
>>> But in flink 1.5.0 the flink client additionally requires the
>>> rest.address and rest.port for submitting the job to job
manager. What is
>>> the advantage of this new method over the 1.4.2 method of
submitting job?
>>>
>>> Moreover if we make rest.port = -1 the web server will not be
>>> instantiated then how should we submit the job?
>>>
>>> Regards
>>> Sampath
>>>
>>>
>>>





Re: Breakage in Flink CLI in 1.5.0

2018-06-20 Thread Till Rohrmann
n if REST API (i.e. the web
> >> server) is not instantiated.
> >>
> >> If jobmanger.rpc.address is not required for flink client then why is it
> >> still looking for that property in flink-conf.yaml? Isn't it not a bug?
> >> Because if we comment out the jobmanger.rpc.address and
> jobmanger.rpc.port
> >> then flink client will not be able to submit the job.
> >>
> >>
> >> On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler 
> >> wrote:
> >>
> >>> In 1.5 we reworked the job-submission to go through the REST API
> instead
> >>> of akka.
> >>>
> >>> I believe the jobmanager rpc port shouldn't be necessary anymore, the
> rpc
> >>> address is still *required *due to some technical implementations; it
> >>> may be that you can set this to some arbitrary value however.
> >>>
> >>> As a result the REST API (i.e. the web server) must be running in order
> >>> to submit jobs.
> >>>
> >>>
> >>> On 19.06.2018 14:12, Sampath Bhat wrote:
> >>>
> >>> Hello
> >>>
> >>> I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink
> >>> cluster.
> >>>
> >>> In flink 1.4.2 only job manager rpc address and job manager rpc port
> were
> >>> sufficient for flink client to connect to job manager and submit the
> job.
> >>>
> >>> But in flink 1.5.0 the flink client additionally requires the
> >>> rest.address and rest.port for submitting the job to job manager. What
> is
> >>> the advantage of this new method over the 1.4.2 method of submitting
> job?
> >>>
> >>> Moreover if we make rest.port = -1 the web server will not be
> >>> instantiated then how should we submit the job?
> >>>
> >>> Regards
> >>> Sampath
> >>>
> >>>
> >>>
>
>


Re: Breakage in Flink CLI in 1.5.0

2018-06-20 Thread Chesnay Schepler

I was worried this might be the case.

The rest.port handling was simply copied from the legacy web-server, 
which explicitly allowed shutting it down.
It may (I'm not entirely sure) also not be necessary for all deployment 
modes; for example if the job is baked into the job/taskmanager images.


I'm not quite sure whether the rpc address is actually required for the 
REST job submission, or only since we still rely partly on some legacy 
code (ClusterClient). Maybe Till (cc) knows the answer to that.



Adding on to this point you made - " the rpc address is still *required *due
to some technical implementations; it may be that you can set this to some
arbitrary value however."

For job submission to happen successfully we should give specific rpc
address and not any arbitrary value. If any arbitrary value is given the
job submission fails with the following error -
org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't
retrieve standalone cluster
 at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:51)
 at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:31)
 at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:249)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
 at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
 at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
 at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.net.UnknownHostException: flinktest-flink-jobmanager1233445:
Name or service not known
  (Random name flinktest-flink-jobmanager1233445)
 at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
 at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
 at
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
 at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
 at java.net.InetAddress.getAllByName(InetAddress.java:1192)
 at java.net.InetAddress.getAllByName(InetAddress.java:1126)
 at java.net.InetAddress.getByName(InetAddress.java:1076)
 at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.getRpcUrl(AkkaRpcServiceUtils.java:171)
 at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.getRpcUrl(AkkaRpcServiceUtils.java:136)
 at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:83)
 at
org.apache.flink.client.program.ClusterClient.(ClusterClient.java:158)
 at
org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:184)
 at
org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:157)
 at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:49)
 ... 7 more


On Wed, Jun 20, 2018 at 11:18 AM, Sampath Bhat 
wrote:


Hi Chesnay

If REST API (i.e. the web server) is mandatory for submitting jobs then
why is there an option to set rest.port to -1? I think it should be
mandatory to set some valid port for rest.port and make sure flink job
manager does not come up if valid port is not set for rest.port? Or else
there must be some way to submit jobs even if REST API (i.e. the web
server) is not instantiated.

If jobmanger.rpc.address is not required for flink client then why is it
still looking for that property in flink-conf.yaml? Isn't it not a bug?
Because if we comment out the jobmanger.rpc.address and jobmanger.rpc.port
then flink client will not be able to submit the job.


On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler 
wrote:


In 1.5 we reworked the job-submission to go through the REST API instead
of akka.

I believe the jobmanager rpc port shouldn't be necessary anymore, the rpc
address is still *required *due to some technical implementations; it
may be that you can set this to some arbitrary value however.

As a result the REST API (i.e. the web server) must be running in order
to submit jobs.


On 19.06.2018 14:12, Sampath Bhat wrote:

Hello

I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink
cluster.

In flink 1.4.2 only job manager rpc address and job manager rpc port were
sufficient for flink client to connect to job manager and submit the job.

But in flink 1.5.0 the flink client additionally requires the
rest.address and rest.port for submitting the job to job manager. What is
the advantage of this new method over the 1.4.2 method of submitting job?

Moreover if we make rest.port = -1 the web server will not be
instantiated then how should we submit the job?

Regards
Sampath







Re: Breakage in Flink CLI in 1.5.0

2018-06-19 Thread Sampath Bhat
Hi Chesnay

Adding on to this point you made - " the rpc address is still *required *due
to some technical implementations; it may be that you can set this to some
arbitrary value however."

For job submission to happen successfully we should give specific rpc
address and not any arbitrary value. If any arbitrary value is given the
job submission fails with the following error -
org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't
retrieve standalone cluster
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:51)
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:31)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:249)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.net.UnknownHostException: flinktest-flink-jobmanager1233445:
Name or service not known
 (Random name flinktest-flink-jobmanager1233445)
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
at
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
at java.net.InetAddress.getAllByName(InetAddress.java:1192)
at java.net.InetAddress.getAllByName(InetAddress.java:1126)
at java.net.InetAddress.getByName(InetAddress.java:1076)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.getRpcUrl(AkkaRpcServiceUtils.java:171)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.getRpcUrl(AkkaRpcServiceUtils.java:136)
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:83)
at
org.apache.flink.client.program.ClusterClient.(ClusterClient.java:158)
at
org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:184)
at
org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:157)
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:49)
... 7 more


On Wed, Jun 20, 2018 at 11:18 AM, Sampath Bhat 
wrote:

> Hi Chesnay
>
> If REST API (i.e. the web server) is mandatory for submitting jobs then
> why is there an option to set rest.port to -1? I think it should be
> mandatory to set some valid port for rest.port and make sure flink job
> manager does not come up if valid port is not set for rest.port? Or else
> there must be some way to submit jobs even if REST API (i.e. the web
> server) is not instantiated.
>
> If jobmanger.rpc.address is not required for flink client then why is it
> still looking for that property in flink-conf.yaml? Isn't it not a bug?
> Because if we comment out the jobmanger.rpc.address and jobmanger.rpc.port
> then flink client will not be able to submit the job.
>
>
> On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler 
> wrote:
>
>> In 1.5 we reworked the job-submission to go through the REST API instead
>> of akka.
>>
>> I believe the jobmanager rpc port shouldn't be necessary anymore, the rpc
>> address is still *required *due to some technical implementations; it
>> may be that you can set this to some arbitrary value however.
>>
>> As a result the REST API (i.e. the web server) must be running in order
>> to submit jobs.
>>
>>
>> On 19.06.2018 14:12, Sampath Bhat wrote:
>>
>> Hello
>>
>> I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink
>> cluster.
>>
>> In flink 1.4.2 only job manager rpc address and job manager rpc port were
>> sufficient for flink client to connect to job manager and submit the job.
>>
>> But in flink 1.5.0 the flink client additionally requires the
>> rest.address and rest.port for submitting the job to job manager. What is
>> the advantage of this new method over the 1.4.2 method of submitting job?
>>
>> Moreover if we make rest.port = -1 the web server will not be
>> instantiated then how should we submit the job?
>>
>> Regards
>> Sampath
>>
>>
>>
>


Re: Breakage in Flink CLI in 1.5.0

2018-06-19 Thread Sampath Bhat
Hi Chesnay

If REST API (i.e. the web server) is mandatory for submitting jobs then why
is there an option to set rest.port to -1? I think it should be mandatory
to set some valid port for rest.port and make sure flink job manager does
not come up if valid port is not set for rest.port? Or else there must be
some way to submit jobs even if REST API (i.e. the web server) is not
instantiated.

If jobmanger.rpc.address is not required for flink client then why is it
still looking for that property in flink-conf.yaml? Isn't it not a bug?
Because if we comment out the jobmanger.rpc.address and jobmanger.rpc.port
then flink client will not be able to submit the job.


On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler 
wrote:

> In 1.5 we reworked the job-submission to go through the REST API instead
> of akka.
>
> I believe the jobmanager rpc port shouldn't be necessary anymore, the rpc
> address is still *required *due to some technical implementations; it may
> be that you can set this to some arbitrary value however.
>
> As a result the REST API (i.e. the web server) must be running in order to
> submit jobs.
>
>
> On 19.06.2018 14:12, Sampath Bhat wrote:
>
> Hello
>
> I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink cluster.
>
> In flink 1.4.2 only job manager rpc address and job manager rpc port were
> sufficient for flink client to connect to job manager and submit the job.
>
> But in flink 1.5.0 the flink client additionally requires the rest.address
> and rest.port for submitting the job to job manager. What is the advantage
> of this new method over the 1.4.2 method of submitting job?
>
> Moreover if we make rest.port = -1 the web server will not be instantiated
> then how should we submit the job?
>
> Regards
> Sampath
>
>
>


Re: Breakage in Flink CLI in 1.5.0

2018-06-19 Thread Chesnay Schepler
In 1.5 we reworked the job-submission to go through the REST API instead 
of akka.


I believe the jobmanager rpc port shouldn't be necessary anymore, the 
rpc address is still /required /due to some technical implementations; 
it may be that you can set this to some arbitrary value however.


As a result the REST API (i.e. the web server) must be running in order 
to submit jobs.


On 19.06.2018 14:12, Sampath Bhat wrote:

Hello

I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink 
cluster.


In flink 1.4.2 only job manager rpc address and job manager rpc port 
were sufficient for flink client to connect to job manager and submit 
the job.


But in flink 1.5.0 the flink client additionally requires the 
rest.address and rest.port for submitting the job to job manager. What 
is the advantage of this new method over the 1.4.2 method of 
submitting job?


Moreover if we make rest.port = -1 the web server will not be 
instantiated then how should we submit the job?


Regards
Sampath





Breakage in Flink CLI in 1.5.0

2018-06-19 Thread Sampath Bhat
Hello

I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink cluster.

In flink 1.4.2 only job manager rpc address and job manager rpc port were
sufficient for flink client to connect to job manager and submit the job.

But in flink 1.5.0 the flink client additionally requires the rest.address
and rest.port for submitting the job to job manager. What is the advantage
of this new method over the 1.4.2 method of submitting job?

Moreover if we make rest.port = -1 the web server will not be instantiated
then how should we submit the job?

Regards
Sampath


Re: Flink CLI cannot submit job to Flink on Mesos

2017-08-01 Thread Stephan Ewen
Cool, good to hear!

It is one of those "it a feature, not a bug" situations ;-)

Flink's HA mode supports multiple masters, so the CLI needs to have a way
to find which master is "leader" (active, versus the passive masters on
standby). That discovery goes through ZooKeeper as well (which is the
ground truth for who is the leader).

Stephan


On Tue, Aug 1, 2017 at 11:36 AM, Francisco Gonzalez Barea <
francisco.gonza...@piksel.com> wrote:

> Hey! It´s working now!!
>
> I will do a summary for those who might have the same problem in the
> future:
>
> - *Flink 1.3.0 dockerized on Mesos:*
> - Add the HA configuration values in your flink app:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/jobmanager_high_availability.html#config-
> file-flink-confyaml
> - Add the Mesos HA configuration values in your flink app:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/mesos.html#high-availability
>
> -* Flink CLI 1.3.0 on my local machine* (make sure you *use the same
> version*!!)
> - Add same HA configuration values in your flink CLI configuration:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> jobmanager_high_availability.html#config-file-flink-confyaml
>
>
> With those steps, my ./fink run command it´s working like a charm.
>
> Thank you very much guys!
>
> Regards,
> Francisco
>
>
>
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#config-file-flink-confyaml>
>
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#config-file-flink-confyaml>
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#config-file-flink-confyaml>
>
> On 1 Aug 2017, at 10:24, Francisco Gonzalez Barea <
> francisco.gonza...@piksel.com> wrote:
>
> Hi Stephan,
>
> So, do you mean to remove the “-m” param from the flink CLI call? And on
> the other hand, that I should add the Zookeeper configuration in both
> sides, the remote flink and locally in the flink CLI config, right?
>
> Regards
>
>
> On 31 Jul 2017, at 22:21, Stephan Ewen  wrote:
>
> Hi Francisco!
>
> Can you drop the explicit address of the jobmanager? The client should
> pick up that address automatically from ZooKeeper as well (together with
> the HA leader session ID).
>
> Please check if you have the ZooKeeper HA config entries in the config
> used by the CLI.
>
> Stephan
>
>
> On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea <
> francisco.gonza...@piksel.com> wrote:
>
>> Hi again,
>>
>> On the other hand, we are running the following flink CLI command:
>>
>> ./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}
>>  ${our-program-jar} ${our-program-params}
>>
>> Maybe is the command what we are using wrongly?
>>
>> Thank you
>>
>> On 28 Jul 2017, at 11:07, Till Rohrmann  wrote:
>>
>> Hi Francisco,
>>
>> have you set the right high-availability configuration options in your
>> client configuration as described here [1]? If not, then Flink is not able
>> to find the correct JobManager because it retrieves the address as well as
>> a fencing token (called leader session id) from the HA store (ZooKeeper).
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.3/setup/mesos.html#high-availability
>>
>> Cheers,
>> Till
>>
>> On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <
>> francisco.gonza...@piksel.com> wrote:
>>
>>> Hello,
>>>
>>> We´re having lot of issues while trying to submit a job remotely using
>>> the Flink CLI command line tool. We have tried different configurations but
>>> in all of them we get errors from AKKA while trying to connect. I will try
>>> to summarise the configurations we´ve tried.
>>>
>>> - Flink 1.3.0 deployed within a docker container on a Mesos cluster
>>> (using Marathon)
>>> - This flink has the property jobmanager.rpc.address as a hostname (i.e.
>>> kind of ip-X.eu <http://ip-x.eu/>.west-1.comp
>>> ute.internal)
>>> - Use the same version for Flink Client remotely (e.g. in my laptop).
>>>
>>> When I try to submit the job using the command flink run -m
>>> myHostName:myPort (the same in jobmanager.rpc.address and
>>> jobmanager.rpc.port) after some time waiting I get the trace at the end of
>>> this email. In the flink side we get this error from AKKA:
>>>
>&g

Re: Flink CLI cannot submit job to Flink on Mesos

2017-08-01 Thread Francisco Gonzalez Barea
Hey! It´s working now!!

I will do a summary for those who might have the same problem in the future:

- Flink 1.3.0 dockerized on Mesos:
- Add the HA configuration values in your flink app: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#config-file-flink-confyaml
- Add the Mesos HA configuration values in your flink app: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html#high-availability

- Flink CLI 1.3.0 on my local machine (make sure you use the same version!!)
- Add same HA configuration values in your flink CLI configuration: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#config-file-flink-confyaml


With those steps, my ./fink run command it´s working like a charm.

Thank you very much guys!

Regards,
Francisco


<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#config-file-flink-confyaml>
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#config-file-flink-confyaml><https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#config-file-flink-confyaml>

On 1 Aug 2017, at 10:24, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:

Hi Stephan,

So, do you mean to remove the “-m” param from the flink CLI call? And on the 
other hand, that I should add the Zookeeper configuration in both sides, the 
remote flink and locally in the flink CLI config, right?

Regards


On 31 Jul 2017, at 22:21, Stephan Ewen 
mailto:se...@apache.org>> wrote:

Hi Francisco!

Can you drop the explicit address of the jobmanager? The client should pick up 
that address automatically from ZooKeeper as well (together with the HA leader 
session ID).

Please check if you have the ZooKeeper HA config entries in the config used by 
the CLI.

Stephan


On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:
Hi again,

On the other hand, we are running the following flink CLI command:

./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}  
${our-program-jar} ${our-program-params}

Maybe is the command what we are using wrongly?

Thank you

On 28 Jul 2017, at 11:07, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client 
configuration as described here [1]? If not, then Flink is not able to find the 
correct JobManager because it retrieves the address as well as a fencing token 
(called leader session id) from the HA store (ZooKeeper).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html#high-availability

Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the 
Flink CLI command line tool. We have tried different configurations but in all 
of them we get errors from AKKA while trying to connect. I will try to 
summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using 
Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind 
of ip-X.eu<http://ip-x.eu/>.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort 
(the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time 
waiting I get the trace at the end of this email. In the flink side we get this 
error from AKKA:

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: 
/10.203.23.24:24469<http://10.203.23.24:24469/>]

After reading a bit, it seems there´re some problems related to akka resolving 
hostnames to ips, so we decided to startup the same flink but changing 
jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In 
this case I´m getting same trace (at the end of the email) from the client side 
and this one from the Flink server:

Discard message 
LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
 b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader 
session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received 
leader session ID ----.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: JobManager did not respond within 6 milliseconds
a

Re: Flink CLI cannot submit job to Flink on Mesos

2017-08-01 Thread Francisco Gonzalez Barea
Hi Stephan,

So, do you mean to remove the “-m” param from the flink CLI call? And on the 
other hand, that I should add the Zookeeper configuration in both sides, the 
remote flink and locally in the flink CLI config, right?

Regards


On 31 Jul 2017, at 22:21, Stephan Ewen 
mailto:se...@apache.org>> wrote:

Hi Francisco!

Can you drop the explicit address of the jobmanager? The client should pick up 
that address automatically from ZooKeeper as well (together with the HA leader 
session ID).

Please check if you have the ZooKeeper HA config entries in the config used by 
the CLI.

Stephan


On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:
Hi again,

On the other hand, we are running the following flink CLI command:

./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}  
${our-program-jar} ${our-program-params}

Maybe is the command what we are using wrongly?

Thank you

On 28 Jul 2017, at 11:07, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client 
configuration as described here [1]? If not, then Flink is not able to find the 
correct JobManager because it retrieves the address as well as a fencing token 
(called leader session id) from the HA store (ZooKeeper).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html#high-availability

Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the 
Flink CLI command line tool. We have tried different configurations but in all 
of them we get errors from AKKA while trying to connect. I will try to 
summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using 
Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind 
of ip-X.eu<http://ip-x.eu/>.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort 
(the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time 
waiting I get the trace at the end of this email. In the flink side we get this 
error from AKKA:

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: 
/10.203.23.24:24469<http://10.203.23.24:24469/>]

After reading a bit, it seems there´re some problems related to akka resolving 
hostnames to ips, so we decided to startup the same flink but changing 
jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In 
this case I´m getting same trace (at the end of the email) from the client side 
and this one from the Flink server:

Discard message 
LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
 b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader 
session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received 
leader session ID ----.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: JobManager did not respond within 6 milliseconds
at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.do<http://javax.security.auth.subject.do/>As(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutExc

Re: Flink CLI cannot submit job to Flink on Mesos

2017-07-31 Thread Stephan Ewen
Hi Francisco!

Can you drop the explicit address of the jobmanager? The client should pick
up that address automatically from ZooKeeper as well (together with the HA
leader session ID).

Please check if you have the ZooKeeper HA config entries in the config used
by the CLI.

Stephan


On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea <
francisco.gonza...@piksel.com> wrote:

> Hi again,
>
> On the other hand, we are running the following flink CLI command:
>
> ./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}
>  ${our-program-jar} ${our-program-params}
>
> Maybe is the command what we are using wrongly?
>
> Thank you
>
> On 28 Jul 2017, at 11:07, Till Rohrmann  wrote:
>
> Hi Francisco,
>
> have you set the right high-availability configuration options in your
> client configuration as described here [1]? If not, then Flink is not able
> to find the correct JobManager because it retrieves the address as well as
> a fencing token (called leader session id) from the HA store (ZooKeeper).
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/mesos.html#high-availability
>
> Cheers,
> Till
>
> On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <
> francisco.gonza...@piksel.com> wrote:
>
>> Hello,
>>
>> We´re having lot of issues while trying to submit a job remotely using
>> the Flink CLI command line tool. We have tried different configurations but
>> in all of them we get errors from AKKA while trying to connect. I will try
>> to summarise the configurations we´ve tried.
>>
>> - Flink 1.3.0 deployed within a docker container on a Mesos cluster
>> (using Marathon)
>> - This flink has the property jobmanager.rpc.address as a hostname (i.e.
>> kind of ip-X.eu <http://ip-x.eu/>.west-1.comp
>> ute.internal)
>> - Use the same version for Flink Client remotely (e.g. in my laptop).
>>
>> When I try to submit the job using the command flink run -m
>> myHostName:myPort (the same in jobmanager.rpc.address and
>> jobmanager.rpc.port) after some time waiting I get the trace at the end of
>> this email. In the flink side we get this error from AKKA:
>>
>> Association with remote system [akka.tcp://flink@10.203.23.24:24469] has
>> failed, address is now gated for [5000] ms. Reason: [Association failed
>> with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection
>> refused: /10.203.23.24:24469]
>>
>> After reading a bit, it seems there´re some problems related to akka
>> resolving hostnames to ips, so we decided to startup the same flink but
>> changing jobmanager.rpc.address to have the direct ip (i.e. kind of
>> XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the
>> email) from the client side and this one from the Flink server:
>>
>> Discard message LeaderSessionMessage(-
>> ---,SubmitJob(JobGraph(jobId:
>> b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader
>> session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the
>> received leader session ID ----.
>>
>> We have tried some other stuff but without success… any clue that could
>> help us?
>>
>> Thanks in advance!
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: JobManager did not respond within 6 milliseconds
>> at org.apache.flink.client.program.ClusterClient.runDetached(
>> ClusterClient.java:454)
>> at org.apache.flink.client.program.StandaloneClusterClient.subm
>> itJob(StandaloneClusterClient.java:99)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:400)
>> at org.apache.flink.client.program.DetachedEnvironment.finalize
>> Execute(DetachedEnvironment.java:76)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:345)
>> at org.apache.flink.client.CliFrontend.executeProgram(CliFronte
>> nd.java:831)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>> end.java:1073)
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>> at org.apache.flink.runtime.security.HadoopSecurityContext$1.
>> run(HadoopSecurityContext.java:43)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at org.apache.hadoop.security.UserGroupInformation.

Re: Flink CLI cannot submit job to Flink on Mesos

2017-07-31 Thread Francisco Gonzalez Barea
Hi again,

On the other hand, we are running the following flink CLI command:

./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}  
${our-program-jar} ${our-program-params}

Maybe is the command what we are using wrongly?

Thank you

On 28 Jul 2017, at 11:07, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client 
configuration as described here [1]? If not, then Flink is not able to find the 
correct JobManager because it retrieves the address as well as a fencing token 
(called leader session id) from the HA store (ZooKeeper).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html#high-availability

Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the 
Flink CLI command line tool. We have tried different configurations but in all 
of them we get errors from AKKA while trying to connect. I will try to 
summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using 
Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind 
of ip-X.eu<http://ip-x.eu/>.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort 
(the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time 
waiting I get the trace at the end of this email. In the flink side we get this 
error from AKKA:

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: 
/10.203.23.24:24469<http://10.203.23.24:24469/>]

After reading a bit, it seems there´re some problems related to akka resolving 
hostnames to ips, so we decided to startup the same flink but changing 
jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In 
this case I´m getting same trace (at the end of the email) from the client side 
and this one from the Flink server:

Discard message 
LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
 b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader 
session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received 
leader session ID ----.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: JobManager did not respond within 6 milliseconds
at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did 
not respond within 6 milliseconds
at 
org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[6 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.

Re: Flink CLI cannot submit job to Flink on Mesos

2017-07-31 Thread Francisco Gonzalez Barea
Hi Till,

Thanks for your answer.

We have reviewed the configuration and everything seems fine in our side…  But 
we´re still getting the message:

“Discard message 
LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
 041b67c7ef765c2f61bd69c2b9dacbce),DETACHED)) because the expected leader 
session ID 9e9e4e4b-1236-4140-9156-fd207929aab5 did not equal the received 
leader session ID ----.”

The point is we have another configuration using Flink 1.1.3 on YARN, and it´s 
working cool. And if I take a look at the configuration values, the main 
difference I can see (apart from mesos/yarn config parameters) is that in yarn 
the jobmanager.rpc.address is an ip and on mesos it´s a hostname. Might this be 
related?

Thanks in advance.


On 28 Jul 2017, at 11:07, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client 
configuration as described here [1]? If not, then Flink is not able to find the 
correct JobManager because it retrieves the address as well as a fencing token 
(called leader session id) from the HA store (ZooKeeper).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html#high-availability

Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the 
Flink CLI command line tool. We have tried different configurations but in all 
of them we get errors from AKKA while trying to connect. I will try to 
summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using 
Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind 
of ip-X.eu<http://ip-x.eu/>.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort 
(the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time 
waiting I get the trace at the end of this email. In the flink side we get this 
error from AKKA:

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: 
/10.203.23.24:24469<http://10.203.23.24:24469/>]

After reading a bit, it seems there´re some problems related to akka resolving 
hostnames to ips, so we decided to startup the same flink but changing 
jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In 
this case I´m getting same trace (at the end of the email) from the client side 
and this one from the Flink server:

Discard message 
LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
 b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader 
session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received 
leader session ID ----.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: JobManager did not respond within 6 milliseconds
at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did 
not respond within 6 milliseconds
at 
org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at 
org.apache.flink.client.program.ClusterClient.runDetache

Re: Flink CLI cannot submit job to Flink on Mesos

2017-07-28 Thread Till Rohrmann
Hi Francisco,

have you set the right high-availability configuration options in your
client configuration as described here [1]? If not, then Flink is not able
to find the correct JobManager because it retrieves the address as well as
a fencing token (called leader session id) from the HA store (ZooKeeper).

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html#high-availability

Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <
francisco.gonza...@piksel.com> wrote:

> Hello,
>
> We´re having lot of issues while trying to submit a job remotely using the
> Flink CLI command line tool. We have tried different configurations but in
> all of them we get errors from AKKA while trying to connect. I will try to
> summarise the configurations we´ve tried.
>
> - Flink 1.3.0 deployed within a docker container on a Mesos cluster (using
> Marathon)
> - This flink has the property jobmanager.rpc.address as a hostname (i.e.
> kind of ip-X.eu.west-1.compute.internal)
> - Use the same version for Flink Client remotely (e.g. in my laptop).
>
> When I try to submit the job using the command flink run -m
> myHostName:myPort (the same in jobmanager.rpc.address and
> jobmanager.rpc.port) after some time waiting I get the trace at the end of
> this email. In the flink side we get this error from AKKA:
>
> Association with remote system [akka.tcp://flink@10.203.23.24:24469] has
> failed, address is now gated for [5000] ms. Reason: [Association failed
> with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection
> refused: /10.203.23.24:24469]
>
> After reading a bit, it seems there´re some problems related to akka
> resolving hostnames to ips, so we decided to startup the same flink but
> changing jobmanager.rpc.address to have the direct ip (i.e. kind of
> XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the
> email) from the client side and this one from the Flink server:
>
> Discard message 
> LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
> b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader
> session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the
> received leader session ID ----.
>
> We have tried some other stuff but without success… any clue that could
> help us?
>
> Thanks in advance!
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: JobManager did not respond within 6 milliseconds
> at org.apache.flink.client.program.ClusterClient.
> runDetached(ClusterClient.java:454)
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> StandaloneClusterClient.java:99)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:400)
> at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(
> DetachedEnvironment.java:76)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:345)
> at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:831)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1073)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
> at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1548)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: org.apache.flink.runtime.client.JobTimeoutException:
> JobManager did not respond within 6 milliseconds
> at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.
> java:426)
> at org.apache.flink.client.program.ClusterClient.
> runDetached(ClusterClient.java:451)
> ... 15 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [6 milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(
> BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at scala.concurrent.Await.result(package.scala)
> at org.apache.flink.runtime.client

Flink CLI cannot submit job to Flink on Mesos

2017-07-27 Thread Francisco Gonzalez Barea
Hello,

We´re having lot of issues while trying to submit a job remotely using the 
Flink CLI command line tool. We have tried different configurations but in all 
of them we get errors from AKKA while trying to connect. I will try to 
summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using 
Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind 
of ip-X.eu<http://ip-X.eu>.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort 
(the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time 
waiting I get the trace at the end of this email. In the flink side we get this 
error from AKKA:

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: 
/10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving 
hostnames to ips, so we decided to startup the same flink but changing 
jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In 
this case I´m getting same trace (at the end of the email) from the client side 
and this one from the Flink server:

Discard message 
LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
 b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader 
session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received 
leader session ID ----.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: JobManager did not respond within 6 milliseconds
at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did 
not respond within 6 milliseconds
at 
org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[6 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at 
org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:423)
... 16 more




This message is private and confidential. If you have received this message in 
error, please notify the sender or serviced...@piksel.com and remove it from 
your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road 
SE, Suite 400, Atlanta, GA 30339