RE: Re: flink configuration in flink kubernetes operator question about password

2023-07-26 Thread Jiabao Sun
Hi tian tian,

I think we can use podTemplate to mount kubernetes secrets as file or 
environment variables.
Then we can access the secrets in our flink program. 

Please refers to

https://github.com/apache/flink-kubernetes-operator/blob/main/examples/pod-template.yaml
 

https://kubernetes.io/docs/concepts/configuration/secret/#using-a-secret 


On 2023/07/21 10:53:10 tian tian wrote:
> Like s3.secret-key, the plaintext password cannot be directly written in
> the configuration. Is there a template language like jinja that can be
> replaced after mounting to the pod?
> 
> >
> 

Re: flink configuration in flink kubernetes operator question about password

2023-07-21 Thread tian tian
Like s3.secret-key, the plaintext password cannot be directly written in
the configuration. Is there a template language like jinja that can be
replaced after mounting to the pod?

>


flink configuration in flink kubernetes operator question about password

2023-07-21 Thread tian tian
hi all, How to specify the password and other information that needs to be
encrypted in the configuration file?


Re: Relation between Flink Configuration and TableEnv

2021-09-24 Thread Paul Lam
Hi Caizhi,

Thanks a lot for you clarification! Now I understand the design of TableConfig. 

Best,
Paul Lam

> 2021年9月24日 15:40,Caizhi Weng  写道:
> 
> Hi!
> 
> TableConfig is for configurations related to the Table and SQL API, 
> especially the configurations in OptimizerConfigOptions and 
> ExecutionConfigOptions.
> 
> By Flink Configuration I guess you mean the configuration in Configuration. 
> Sadly, as you say, it can be configured only once when creating the execution 
> environment, and only a few configuration can be modified with the 
> StreamExecutionEnvironment#configure method. This configuration contains 
> datastream and execution related configurations.
> 
> I bet the devs separate the configurations because they belong to different 
> modules, but yes sometimes users will set their configuration in the wrong 
> place.
> 
> Paul Lam mailto:paullin3...@gmail.com>> 于2021年9月24日周五 
> 上午12:26写道:
> Sorry, I mean the relation between Flink Configuration and TableConfig, not 
> TableEnv.
> 
> Best,
> Paul Lam
> 
> Paul Lam mailto:paullin3...@gmail.com>> 于2021年9月24日周五 
> 上午12:24写道:
> Hi all,
> 
> Currently, Flink creates a new Configuration in TableConfig of  
> StreamTableEnvironment, and synchronizes options in it back to the 
> Configuration of the underlying StreamExecutionEnvironment afterward. 
> However, only "relevant" options are set back [1], others are dropped 
> silently. That blocks users from overriding options like 
> DeploymentOptions.ATTACHED from TableConfig. So I'm wondering the reason why 
> not merge all options from TableConfig into the Configuration, and what's the 
> relation between Configuration and TableEnv?
> 
> Thanks a lot!
> 
> [1] 
> https://github.com/apache/flink/blob/36ff71f5ff63a140acc634dd1d98b2bb47a76ba5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L904
>  
> <https://github.com/apache/flink/blob/36ff71f5ff63a140acc634dd1d98b2bb47a76ba5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L904>
> 
> Best,
> Paul Lam
> 



Re: Relation between Flink Configuration and TableEnv

2021-09-24 Thread Caizhi Weng
Hi!

TableConfig is for configurations related to the Table and SQL API,
especially the configurations in OptimizerConfigOptions and
ExecutionConfigOptions.

By Flink Configuration I guess you mean the configuration in Configuration.
Sadly, as you say, it can be configured only once when creating the
execution environment, and only a few configuration can be modified with
the StreamExecutionEnvironment#configure method. This configuration
contains datastream and execution related configurations.

I bet the devs separate the configurations because they belong to different
modules, but yes sometimes users will set their configuration in the wrong
place.

Paul Lam  于2021年9月24日周五 上午12:26写道:

> Sorry, I mean the relation between Flink Configuration and TableConfig,
> not TableEnv.
>
> Best,
> Paul Lam
>
> Paul Lam  于2021年9月24日周五 上午12:24写道:
>
>> Hi all,
>>
>> Currently, Flink creates a new Configuration in TableConfig of
>> StreamTableEnvironment, and synchronizes options in it back to the
>> Configuration of the underlying StreamExecutionEnvironment afterward.
>> However, only "relevant" options are set back [1], others are dropped
>> silently. That blocks users from overriding options like
>> DeploymentOptions.ATTACHED from TableConfig. So I'm wondering the reason
>> why not merge all options from TableConfig into the Configuration, and
>> what's the relation between Configuration and TableEnv?
>>
>> Thanks a lot!
>>
>> [1]
>> https://github.com/apache/flink/blob/36ff71f5ff63a140acc634dd1d98b2bb47a76ba5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L904
>>
>> Best,
>> Paul Lam
>>
>>


Re: Relation between Flink Configuration and TableEnv

2021-09-23 Thread Paul Lam
Sorry, I mean the relation between Flink Configuration and TableConfig, not
TableEnv.

Best,
Paul Lam

Paul Lam  于2021年9月24日周五 上午12:24写道:

> Hi all,
>
> Currently, Flink creates a new Configuration in TableConfig of
> StreamTableEnvironment, and synchronizes options in it back to the
> Configuration of the underlying StreamExecutionEnvironment afterward.
> However, only "relevant" options are set back [1], others are dropped
> silently. That blocks users from overriding options like
> DeploymentOptions.ATTACHED from TableConfig. So I'm wondering the reason
> why not merge all options from TableConfig into the Configuration, and
> what's the relation between Configuration and TableEnv?
>
> Thanks a lot!
>
> [1]
> https://github.com/apache/flink/blob/36ff71f5ff63a140acc634dd1d98b2bb47a76ba5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L904
>
> Best,
> Paul Lam
>
>


Relation between Flink Configuration and TableEnv

2021-09-23 Thread Paul Lam
Hi all,

Currently, Flink creates a new Configuration in TableConfig of
StreamTableEnvironment, and synchronizes options in it back to the
Configuration of the underlying StreamExecutionEnvironment afterward.
However, only "relevant" options are set back [1], others are dropped
silently. That blocks users from overriding options like
DeploymentOptions.ATTACHED from TableConfig. So I'm wondering the reason
why not merge all options from TableConfig into the Configuration, and
what's the relation between Configuration and TableEnv?

Thanks a lot!

[1]
https://github.com/apache/flink/blob/36ff71f5ff63a140acc634dd1d98b2bb47a76ba5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L904

Best,
Paul Lam


Re: flink configuration: best practice for checkpoint storage secrets

2020-10-08 Thread XU Qinghui
Hello Till

Thanks a lot for the reply. But it turns out the IAM is applicable only
when the job is running inside AWS, which is not my case (basically we are
just using the S3 API provided by other services).
By reading again the flink doc, it seems it's suggesting to use the
flink-conf.yaml file, though.

Best regards,
Qinghui

Le mer. 7 oct. 2020 à 18:21, Till Rohrmann  a écrit :

> Hi Qinghui,
>
> the recommended way would be to use AWS identity and access management
> (IAM) [1] if possible.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#configure-access-credentials
>
> Cheers,
> Till
>
> On Wed, Oct 7, 2020 at 12:31 PM XU Qinghui  wrote:
>
>> Hello, folks
>>
>> We are trying to use S3 for the checkpoint storage, and this
>> involves some secrets in the configuration. We tried two approaches to
>> configure those secrets:
>> - in the jvm application argument for jobmanager and taskmanager, such as
>> -Ds3.secret-key
>> - in the flink-conf.yaml file for jobmanager and taskmanager
>>
>> Is there a third way? What's the best practice?
>> Thanks a lot!
>>
>> Best regards,
>> Qinghui
>>
>


Re: flink configuration: best practice for checkpoint storage secrets

2020-10-07 Thread Till Rohrmann
Hi Qinghui,

the recommended way would be to use AWS identity and access management
(IAM) [1] if possible.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#configure-access-credentials

Cheers,
Till

On Wed, Oct 7, 2020 at 12:31 PM XU Qinghui  wrote:

> Hello, folks
>
> We are trying to use S3 for the checkpoint storage, and this involves some
> secrets in the configuration. We tried two approaches to configure those
> secrets:
> - in the jvm application argument for jobmanager and taskmanager, such as
> -Ds3.secret-key
> - in the flink-conf.yaml file for jobmanager and taskmanager
>
> Is there a third way? What's the best practice?
> Thanks a lot!
>
> Best regards,
> Qinghui
>


flink configuration: best practice for checkpoint storage secrets

2020-10-07 Thread XU Qinghui
Hello, folks

We are trying to use S3 for the checkpoint storage, and this involves some
secrets in the configuration. We tried two approaches to configure those
secrets:
- in the jvm application argument for jobmanager and taskmanager, such as
-Ds3.secret-key
- in the flink-conf.yaml file for jobmanager and taskmanager

Is there a third way? What's the best practice?
Thanks a lot!

Best regards,
Qinghui


Re: Optimal Flink configuration for Standalone cluster.

2020-06-29 Thread Dimitris Vogiatzidakis
>
> It could really be specific to your workload. Some workload may need more
> heap memory while others may need more off-heap.
>
The main 'process' of my project creates a cross product of datasets and
then applies a function to all of them to extract some features.


> Alternatively, you can try to launch multiple TMs on one physical machine,
> to reduce the memory size of each TM process.

If I understand correctly you mean instead of 1 TM with 32 slots, I should
have 4 TMs with 8? Or else i would exceed the amount of total cores and
probably have tasks 'waiting' on other tasks to be completed.


BTW, what kind of workload are you running? Is it streaming or batch?
>
It is Batch. I have dataset of edges and try to extract features , to later
be used for link prediction.

Thank you
-Dimitris Vogiatzidakis

>

On Mon, Jun 29, 2020 at 5:07 AM Xintong Song  wrote:

> Since changing off-heap removes memory from '.task.heap.size' is there a
>> ratio that I should follow for better performance?
>>
> I don't think so. It could really be specific to your workload. Some
> workload may need more heap memory while others may need more off-heap.
>
> Also, my guess (since I am dealing with big datasets) is that the more
>> '.flink.size' I provide the better. Is that correct?
>>
> In most cases, yes. But it is also possible the other way around. Larger
> `.flink.size` usually also means larger JVM heap space, which reduces the
> frequency of GCs but increases the time cost on each GC (espeacially full
> GCs). On the other hand, if the memory is large enough, it could become the
> CPU resource rather than the memory that limits the performance. In such
> cases, increasing memory size won't give you more performance improvement
> but might introduce more GC overheads, thus harm the overall performance.
>
> In this particular cluster, since every Machine has 252 total DRAM and
>> worst case scenario 180GB is free to use, should I just say .flink.size:
>> 180g?
>>
> Not sure about this. I would suggest to avoid large task managers (say
> tens of GBs) unless absolutely necessary. Alternatively, you can try to
> launch multiple TMs on one physical machine, to reduce the memory size of
> each TM process.
>
> BTW, what kind of workload are you running? Is it streaming or batch?
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Jun 29, 2020 at 1:18 AM Dimitris Vogiatzidakis <
> dimitrisvogiatzida...@gmail.com> wrote:
>
>> Hi Xintong,
>> Thank you for the quick response.
>> doing 1), without increasing  'task.off-heap.size'  does not change the
>> issue, but increasing the off-heap alone does.
>> What should the off-heap value size be? Since changing off-heap removes
>> memory from '.task.heap.size' is there a ratio that I should follow for
>> better performance?
>> Also, my guess (since I am dealing with big datasets) is that the more
>> '.flink.size' I provide the better. Is that correct? Or will it add extra
>> 'overhead' that could slow down my computations? In this particular
>> cluster, since every Machine has 252 total DRAM and worst case scenario
>> 180GB is free to use, should I just say .flink.size: 180g?
>>
>> Thank you very much and sorry if i'm asking silly questions.
>> Dimitris Vogiatzidakis
>>
>> On Sun, Jun 28, 2020 at 5:25 AM Xintong Song 
>> wrote:
>>
>>> Hi Dimitris,
>>>
>>> Regarding your questions.
>>> a) For standalone clusters, the recommended way is to use `.flink.size`
>>> rather than `.process.size`. `.process.size` includes JVM metaspace and
>>> overhead in addition to `.flink.size`, which usually do not really matter
>>> for standalone clusters.
>>> b) In case of direct OOMs, you should increase
>>> `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
>>> c) Your understanding is correct. And you can also specify the absolute
>>> network memory size by setting the min and max to the same value.
>>>
>>> Here are my suggestions according to what you described.
>>>
>>>1. Since both off-heap and network memory seems insufficient, I
>>>would suggest to increase `taskmanager.memory.flink.size` to give your 
>>> task
>>>managers more memory in total.
>>>2. If 1) does not work, I would suggest not to set the total memory
>>>(means configure neither `.flink.size` nor `process.size`), but go for 
>>> the
>>>fine grained configuration where explicitly specify the individual memory
>>>components. Flink will automatically add them up to derive the total 
>>> memory.
>>>   1. In addition to `.task.off-heap.size` and `.network.[min|max]`,
>>>   you will also need to set `.task.heap.size` and `managed.size`.
>>>   2. If you don't know how many heap/managed memory to configure,
>>>   you can look for the configuration options in the beginning of the TM 
>>> logs
>>>   (`-Dkey=value`). Those are the values derived from your current
>>>   configuration.
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Sat, Jun 27, 2020 at 10:56 PM 

Re: Optimal Flink configuration for Standalone cluster.

2020-06-28 Thread Xintong Song
>
> Since changing off-heap removes memory from '.task.heap.size' is there a
> ratio that I should follow for better performance?
>
I don't think so. It could really be specific to your workload. Some
workload may need more heap memory while others may need more off-heap.

Also, my guess (since I am dealing with big datasets) is that the more
> '.flink.size' I provide the better. Is that correct?
>
In most cases, yes. But it is also possible the other way around. Larger
`.flink.size` usually also means larger JVM heap space, which reduces the
frequency of GCs but increases the time cost on each GC (espeacially full
GCs). On the other hand, if the memory is large enough, it could become the
CPU resource rather than the memory that limits the performance. In such
cases, increasing memory size won't give you more performance improvement
but might introduce more GC overheads, thus harm the overall performance.

In this particular cluster, since every Machine has 252 total DRAM and
> worst case scenario 180GB is free to use, should I just say .flink.size:
> 180g?
>
Not sure about this. I would suggest to avoid large task managers (say tens
of GBs) unless absolutely necessary. Alternatively, you can try to launch
multiple TMs on one physical machine, to reduce the memory size of each TM
process.

BTW, what kind of workload are you running? Is it streaming or batch?


Thank you~

Xintong Song



On Mon, Jun 29, 2020 at 1:18 AM Dimitris Vogiatzidakis <
dimitrisvogiatzida...@gmail.com> wrote:

> Hi Xintong,
> Thank you for the quick response.
> doing 1), without increasing  'task.off-heap.size'  does not change the
> issue, but increasing the off-heap alone does.
> What should the off-heap value size be? Since changing off-heap removes
> memory from '.task.heap.size' is there a ratio that I should follow for
> better performance?
> Also, my guess (since I am dealing with big datasets) is that the more
> '.flink.size' I provide the better. Is that correct? Or will it add extra
> 'overhead' that could slow down my computations? In this particular
> cluster, since every Machine has 252 total DRAM and worst case scenario
> 180GB is free to use, should I just say .flink.size: 180g?
>
> Thank you very much and sorry if i'm asking silly questions.
> Dimitris Vogiatzidakis
>
> On Sun, Jun 28, 2020 at 5:25 AM Xintong Song 
> wrote:
>
>> Hi Dimitris,
>>
>> Regarding your questions.
>> a) For standalone clusters, the recommended way is to use `.flink.size`
>> rather than `.process.size`. `.process.size` includes JVM metaspace and
>> overhead in addition to `.flink.size`, which usually do not really matter
>> for standalone clusters.
>> b) In case of direct OOMs, you should increase
>> `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
>> c) Your understanding is correct. And you can also specify the absolute
>> network memory size by setting the min and max to the same value.
>>
>> Here are my suggestions according to what you described.
>>
>>1. Since both off-heap and network memory seems insufficient, I would
>>suggest to increase `taskmanager.memory.flink.size` to give your task
>>managers more memory in total.
>>2. If 1) does not work, I would suggest not to set the total memory
>>(means configure neither `.flink.size` nor `process.size`), but go for the
>>fine grained configuration where explicitly specify the individual memory
>>components. Flink will automatically add them up to derive the total 
>> memory.
>>   1. In addition to `.task.off-heap.size` and `.network.[min|max]`,
>>   you will also need to set `.task.heap.size` and `managed.size`.
>>   2. If you don't know how many heap/managed memory to configure,
>>   you can look for the configuration options in the beginning of the TM 
>> logs
>>   (`-Dkey=value`). Those are the values derived from your current
>>   configuration.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <
>> dimitrisvogiatzida...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I'm having a bit of trouble understanding the memory configuration on
>>> flink.
>>> I'm using flink10.0.0 to read some datasets of edges and extract
>>> features. I run this on a cluster consisting of 4 nodes , with 32cores and
>>> 252GB Ram each, and hopefully I could expand this as long as I can add
>>> extra nodes to the cluster.
>>>
>>> So regarding the configuration file (flink-conf.yaml).
>>> a) I can't understand when should I use process.size and when
>>> .flink.size.
>>>
>>> b) From the detailed memory model I understand that Direct memory is
>>> included in both of flink and process size, however if I don't specify
>>> off-heap.task.size I get
>>> " OutOfMemoryError: Direct buffer memory " .  Also should I change
>>> off-heap.fraction as well?
>>>
>>> c)When I fix this, I get network buffers error, which if I understand
>>> correctly,  flink.size * network fraction , should be between min 

Re: Optimal Flink configuration for Standalone cluster.

2020-06-28 Thread Dimitris Vogiatzidakis
Hi Xintong,
Thank you for the quick response.
doing 1), without increasing  'task.off-heap.size'  does not change the
issue, but increasing the off-heap alone does.
What should the off-heap value size be? Since changing off-heap removes
memory from '.task.heap.size' is there a ratio that I should follow for
better performance?
Also, my guess (since I am dealing with big datasets) is that the more
'.flink.size' I provide the better. Is that correct? Or will it add extra
'overhead' that could slow down my computations? In this particular
cluster, since every Machine has 252 total DRAM and worst case scenario
180GB is free to use, should I just say .flink.size: 180g?

Thank you very much and sorry if i'm asking silly questions.
Dimitris Vogiatzidakis

On Sun, Jun 28, 2020 at 5:25 AM Xintong Song  wrote:

> Hi Dimitris,
>
> Regarding your questions.
> a) For standalone clusters, the recommended way is to use `.flink.size`
> rather than `.process.size`. `.process.size` includes JVM metaspace and
> overhead in addition to `.flink.size`, which usually do not really matter
> for standalone clusters.
> b) In case of direct OOMs, you should increase
> `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
> c) Your understanding is correct. And you can also specify the absolute
> network memory size by setting the min and max to the same value.
>
> Here are my suggestions according to what you described.
>
>1. Since both off-heap and network memory seems insufficient, I would
>suggest to increase `taskmanager.memory.flink.size` to give your task
>managers more memory in total.
>2. If 1) does not work, I would suggest not to set the total memory
>(means configure neither `.flink.size` nor `process.size`), but go for the
>fine grained configuration where explicitly specify the individual memory
>components. Flink will automatically add them up to derive the total 
> memory.
>   1. In addition to `.task.off-heap.size` and `.network.[min|max]`,
>   you will also need to set `.task.heap.size` and `managed.size`.
>   2. If you don't know how many heap/managed memory to configure, you
>   can look for the configuration options in the beginning of the TM logs
>   (`-Dkey=value`). Those are the values derived from your current
>   configuration.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <
> dimitrisvogiatzida...@gmail.com> wrote:
>
>> Hello,
>>
>> I'm having a bit of trouble understanding the memory configuration on
>> flink.
>> I'm using flink10.0.0 to read some datasets of edges and extract
>> features. I run this on a cluster consisting of 4 nodes , with 32cores and
>> 252GB Ram each, and hopefully I could expand this as long as I can add
>> extra nodes to the cluster.
>>
>> So regarding the configuration file (flink-conf.yaml).
>> a) I can't understand when should I use process.size and when
>> .flink.size.
>>
>> b) From the detailed memory model I understand that Direct memory is
>> included in both of flink and process size, however if I don't specify
>> off-heap.task.size I get
>> " OutOfMemoryError: Direct buffer memory " .  Also should I change
>> off-heap.fraction as well?
>>
>> c)When I fix this, I get network buffers error, which if I understand
>> correctly,  flink.size * network fraction , should be between min and max.
>>
>> I can't find the 'perfect' configuration regarding my setup. What is the
>> optimal way to use the system I have currently?
>>
>> Thank you for your time.
>>
>>
>>


Re: Optimal Flink configuration for Standalone cluster.

2020-06-27 Thread Xintong Song
Hi Dimitris,

Regarding your questions.
a) For standalone clusters, the recommended way is to use `.flink.size`
rather than `.process.size`. `.process.size` includes JVM metaspace and
overhead in addition to `.flink.size`, which usually do not really matter
for standalone clusters.
b) In case of direct OOMs, you should increase
`taskmanager.memory.task.off-heap.size`. There's no fraction for that.
c) Your understanding is correct. And you can also specify the absolute
network memory size by setting the min and max to the same value.

Here are my suggestions according to what you described.

   1. Since both off-heap and network memory seems insufficient, I would
   suggest to increase `taskmanager.memory.flink.size` to give your task
   managers more memory in total.
   2. If 1) does not work, I would suggest not to set the total memory
   (means configure neither `.flink.size` nor `process.size`), but go for the
   fine grained configuration where explicitly specify the individual memory
   components. Flink will automatically add them up to derive the total memory.
  1. In addition to `.task.off-heap.size` and `.network.[min|max]`, you
  will also need to set `.task.heap.size` and `managed.size`.
  2. If you don't know how many heap/managed memory to configure, you
  can look for the configuration options in the beginning of the TM logs
  (`-Dkey=value`). Those are the values derived from your current
  configuration.


Thank you~

Xintong Song



On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <
dimitrisvogiatzida...@gmail.com> wrote:

> Hello,
>
> I'm having a bit of trouble understanding the memory configuration on
> flink.
> I'm using flink10.0.0 to read some datasets of edges and extract features.
> I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram
> each, and hopefully I could expand this as long as I can add extra nodes to
> the cluster.
>
> So regarding the configuration file (flink-conf.yaml).
> a) I can't understand when should I use process.size and when .flink.size.
>
> b) From the detailed memory model I understand that Direct memory is
> included in both of flink and process size, however if I don't specify
> off-heap.task.size I get
> " OutOfMemoryError: Direct buffer memory " .  Also should I change
> off-heap.fraction as well?
>
> c)When I fix this, I get network buffers error, which if I understand
> correctly,  flink.size * network fraction , should be between min and max.
>
> I can't find the 'perfect' configuration regarding my setup. What is the
> optimal way to use the system I have currently?
>
> Thank you for your time.
>
>
>


Optimal Flink configuration for Standalone cluster.

2020-06-27 Thread Dimitris Vogiatzidakis
Hello,

I'm having a bit of trouble understanding the memory configuration on
flink.
I'm using flink10.0.0 to read some datasets of edges and extract features.
I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram
each, and hopefully I could expand this as long as I can add extra nodes to
the cluster.

So regarding the configuration file (flink-conf.yaml).
a) I can't understand when should I use process.size and when .flink.size.

b) From the detailed memory model I understand that Direct memory is
included in both of flink and process size, however if I don't specify
off-heap.task.size I get
" OutOfMemoryError: Direct buffer memory " .  Also should I change
off-heap.fraction as well?

c)When I fix this, I get network buffers error, which if I understand
correctly,  flink.size * network fraction , should be between min and max.

I can't find the 'perfect' configuration regarding my setup. What is the
optimal way to use the system I have currently?

Thank you for your time.


Re: Flink configuration on Docker deployment

2020-01-22 Thread Soheil Pourbafrani
Thanks a lot!

On Wed, Jan 22, 2020 at 3:58 AM Yang Wang  wrote:

> Hi Soheil,
>
> Since you are not using any container orchestration framework(e.g.
> docker-compose, Kubernetes,
> mesos), so you need to manually update the flink-conf.yaml in your docker
> images. Usually, it is
> located in the path "/opt/flink/conf".
> Docker volume also could be used to override the flink configuration when
> you start the jobmanager
> and taskmanager containers[1].
>
> Best,
> Yang
>
> [1]. https://docs.docker.com/storage/volumes/
>
> Soheil Pourbafrani  于2020年1月21日周二 下午7:46写道:
>
>> Hi,
>>
>> I need to set up a Flink cluster using the docker(and not using the
>> docker-compose). I successfully could strat the jobmanager and taskmanager
>> but the problem is I have no idea how to change the default configuration
>> for them. For example in the case of giving 8 slots to the taskmanager or
>> change the memory size of both jobmanager and taskmanager.
>> It will be appreciated if somebody tells me how to change the Flink
>> parameters on docker
>>
>> Thanks
>>
>


Re: Flink configuration on Docker deployment

2020-01-21 Thread Yang Wang
Hi Soheil,

Since you are not using any container orchestration framework(e.g.
docker-compose, Kubernetes,
mesos), so you need to manually update the flink-conf.yaml in your docker
images. Usually, it is
located in the path "/opt/flink/conf".
Docker volume also could be used to override the flink configuration when
you start the jobmanager
and taskmanager containers[1].

Best,
Yang

[1]. https://docs.docker.com/storage/volumes/

Soheil Pourbafrani  于2020年1月21日周二 下午7:46写道:

> Hi,
>
> I need to set up a Flink cluster using the docker(and not using the
> docker-compose). I successfully could strat the jobmanager and taskmanager
> but the problem is I have no idea how to change the default configuration
> for them. For example in the case of giving 8 slots to the taskmanager or
> change the memory size of both jobmanager and taskmanager.
> It will be appreciated if somebody tells me how to change the Flink
> parameters on docker
>
> Thanks
>


Flink configuration on Docker deployment

2020-01-21 Thread Soheil Pourbafrani
Hi,

I need to set up a Flink cluster using the docker(and not using the
docker-compose). I successfully could strat the jobmanager and taskmanager
but the problem is I have no idea how to change the default configuration
for them. For example in the case of giving 8 slots to the taskmanager or
change the memory size of both jobmanager and taskmanager.
It will be appreciated if somebody tells me how to change the Flink
parameters on docker

Thanks


Re: Flink configuration at runtime

2019-11-19 Thread Robert Metzger
Hi Amran,
thanks a lot for your message.

I think this is a valid feature request. I've created a JIRA ticket to
track it: https://issues.apache.org/jira/browse/FLINK-14856 (this does not
mean this gets addressed immediately. However, there are currently quite
some improvements to the configuration system in Flink, as part of FLIP-59
and FLIP-81)

Best,
Robert



On Tue, Nov 19, 2019 at 4:09 AM vino yang  wrote:

> Hi Amran,
>
> Change the config option at runtime? No, Flink does not support this
> feature currently.
>
> However, for Flink on Yarn job cluster mode, you can specify different
> config options for different jobs via program or flink-conf.yaml(copy a new
> flink binary package then change config file).
>
> Best,
> Vino
>
> amran dean  于2019年11月19日周二 上午5:53写道:
>
>> Is it possible to configure certain settings at runtime, on a per-job
>> basis rather than globally within flink-conf.yaml?
>>
>> For example, I have a job where it's desirable to retain a large number
>> of checkpoints via
>> state.checkpoints.num-retained.
>>
>> The checkpoints are cheap, and it's low cost. For other jobs, I don't
>> want such a large number.
>>
>>
>>


Re: Flink configuration at runtime

2019-11-18 Thread vino yang
Hi Amran,

Change the config option at runtime? No, Flink does not support this
feature currently.

However, for Flink on Yarn job cluster mode, you can specify different
config options for different jobs via program or flink-conf.yaml(copy a new
flink binary package then change config file).

Best,
Vino

amran dean  于2019年11月19日周二 上午5:53写道:

> Is it possible to configure certain settings at runtime, on a per-job
> basis rather than globally within flink-conf.yaml?
>
> For example, I have a job where it's desirable to retain a large number of
> checkpoints via
> state.checkpoints.num-retained.
>
> The checkpoints are cheap, and it's low cost. For other jobs, I don't want
> such a large number.
>
>
>


Re: Flink configuration at runtime

2019-11-18 Thread Zhu Zhu
Hi Amran,

Some configs, including "state.checkpoints.num-retained", are cluster
configs that always apply to the entire Flink cluster.
An alternative is to use per-job mode if you are running Flink jobs on
k8s/docker or yarn. Thus to create a Flink cluster for a single job.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/docker.html#flink-job-cluster
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn

Thanks,
Zhu Zhu

amran dean  于2019年11月19日周二 上午5:53写道:

> Is it possible to configure certain settings at runtime, on a per-job
> basis rather than globally within flink-conf.yaml?
>
> For example, I have a job where it's desirable to retain a large number of
> checkpoints via
> state.checkpoints.num-retained.
>
> The checkpoints are cheap, and it's low cost. For other jobs, I don't want
> such a large number.
>
>
>


Flink configuration at runtime

2019-11-18 Thread amran dean
Is it possible to configure certain settings at runtime, on a per-job basis
rather than globally within flink-conf.yaml?

For example, I have a job where it's desirable to retain a large number of
checkpoints via
state.checkpoints.num-retained.

The checkpoints are cheap, and it's low cost. For other jobs, I don't want
such a large number.


Re: Does flink configuration support configed by environment variables?

2019-04-01 Thread Lifei Chen
Thanks,  maybe overwrite configuration in 'conf/flink-conf.yaml' by
docker-entrypoint.sh is a common way to do it, thanks for your advice.

Stephen Connolly  于2019年4月1日周一 下午10:33写道:

> I don't think it does. I ended up writing a small CLI tool to enabling
> templating the file from environment variables. There are loads of such
> tools, but mine is https://github.com/stephenc/envsub
>
> I have the dockerfile like so:
>
> ARG FLINK_VERSION=1.7.2-alpine
> FROM flink:${FLINK_VERSION}
> ARG
> ENVSUB=0.1.0::SHA::b10600c03236bbf0711476e11a1dff9ae285a50a48568bfd0bf6c6014fc69f0c
> RUN apk add --no-cache tini curl \
> #
> # Get envsub
> #
> \
> && curl -fsSL "
> https://github.com/stephenc/envsub/releases/download/${ENVSUB%%::SHA::*}/envsub;
> -o /usr/local/bin/envsub \
> && if [ "${ENVSUB##*::SHA::}" = "${ENVSUB}" ] ; then \
> echo "/usr/local/bin/envsub: Unverified" >&2 ; \
> else \
> echo "${ENVSUB##*::SHA::}  /usr/local/bin/envsub" | sha256sum -c -
> ; \
> fi \
> && chmod +x /usr/local/bin/envsub
>
> COPY rootfs/ /
>
> ENTRYPOINT ["/sbin/tini", "-g", "--", "/docker-entrypoint2.sh"]
>
> CMD []
>
>
> Then docker-entrypoint2.sh looks like
>
> #!/usr/bin/env bash
>
> if [[ "$#" -eq 0 ]]; then
> if [[ -z "${FLINK_ROLE}" ]]; then
> echo "Please set environment variable FLINK_ROLE to either
> jobmanager or taskmanager"
> exit 1
> fi
> envsub < /opt/flink/conf/flink-conf.template.yaml >
> /opt/flink/conf/flink-conf.yaml
> exec /docker-entrypoint.sh "${FLINK_ROLE}"
> fi
>
> exec /docker-entrypoint.sh "${@}"
>
>
> and /opt/flink/conf/flink-conf.template.yaml has the environment variable
> substitution like so:
>
> fs.s3a.endpoint: ${FLINK_S3_ENDPOINT}
> fs.s3a.path.style.access: true
> fs.s3a.connection.ssl.enabled: false
> fs.s3a.access.key: ${AWS_ACCESS_KEY_ID}
> fs.s3a.secret.key: ${AWS_SECRET_KEY}
>
>
> Hope that is sufficient for you to derive your own solution
>
>
> On Fri, 29 Mar 2019 at 03:09, Lifei Chen  wrote:
>
>> Hi guys,
>>
>> I am using flink 1.7.2 deployed by kubernetes,  and I want to change the
>> configurations about flink,  for example customize
>> `taskmanager.heap.size`.
>>
>> Does flink support using environment variables to override configurations
>> in `conf/flink-conf.yaml` ?
>>
>


Re: Does flink configuration support configed by environment variables?

2019-04-01 Thread Stephen Connolly
I don't think it does. I ended up writing a small CLI tool to enabling
templating the file from environment variables. There are loads of such
tools, but mine is https://github.com/stephenc/envsub

I have the dockerfile like so:

ARG FLINK_VERSION=1.7.2-alpine
FROM flink:${FLINK_VERSION}
ARG
ENVSUB=0.1.0::SHA::b10600c03236bbf0711476e11a1dff9ae285a50a48568bfd0bf6c6014fc69f0c
RUN apk add --no-cache tini curl \
#
# Get envsub
#
\
&& curl -fsSL "
https://github.com/stephenc/envsub/releases/download/${ENVSUB%%::SHA::*}/envsub;
-o /usr/local/bin/envsub \
&& if [ "${ENVSUB##*::SHA::}" = "${ENVSUB}" ] ; then \
echo "/usr/local/bin/envsub: Unverified" >&2 ; \
else \
echo "${ENVSUB##*::SHA::}  /usr/local/bin/envsub" | sha256sum -c -
; \
fi \
&& chmod +x /usr/local/bin/envsub

COPY rootfs/ /

ENTRYPOINT ["/sbin/tini", "-g", "--", "/docker-entrypoint2.sh"]

CMD []


Then docker-entrypoint2.sh looks like

#!/usr/bin/env bash

if [[ "$#" -eq 0 ]]; then
if [[ -z "${FLINK_ROLE}" ]]; then
echo "Please set environment variable FLINK_ROLE to either
jobmanager or taskmanager"
exit 1
fi
envsub < /opt/flink/conf/flink-conf.template.yaml >
/opt/flink/conf/flink-conf.yaml
exec /docker-entrypoint.sh "${FLINK_ROLE}"
fi

exec /docker-entrypoint.sh "${@}"


and /opt/flink/conf/flink-conf.template.yaml has the environment variable
substitution like so:

fs.s3a.endpoint: ${FLINK_S3_ENDPOINT}
fs.s3a.path.style.access: true
fs.s3a.connection.ssl.enabled: false
fs.s3a.access.key: ${AWS_ACCESS_KEY_ID}
fs.s3a.secret.key: ${AWS_SECRET_KEY}


Hope that is sufficient for you to derive your own solution


On Fri, 29 Mar 2019 at 03:09, Lifei Chen  wrote:

> Hi guys,
>
> I am using flink 1.7.2 deployed by kubernetes,  and I want to change the
> configurations about flink,  for example customize
> `taskmanager.heap.size`.
>
> Does flink support using environment variables to override configurations
> in `conf/flink-conf.yaml` ?
>


Does flink configuration support configed by environment variables?

2019-03-28 Thread Lifei Chen
Hi guys,

I am using flink 1.7.2 deployed by kubernetes,  and I want to change the
configurations about flink,  for example customize
`taskmanager.heap.size`.

Does flink support using environment variables to override configurations
in `conf/flink-conf.yaml` ?


Re: Access Flink configuration in user functions

2018-12-28 Thread Chesnay Schepler

The configuration is not accessible to user-functions or the main method.

The could either override ConfigurableStatebackend#configure, or 
configure the statebackend globally (see 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html#setting-default-state-backend), 
the backend factory does get access to it I believe.


On 28.12.2018 05:56, Paul Lam wrote:

Hi to all,

I would like to use a custom RocksDBStateBackend which uses the 
default checkpoint dir in Flink configuration, but I failed to find a 
way to access Flink configuration in the user code. So I wonder is it 
possible to retrieve Flink configurations (not user-defined global 
parameters) at the user main method? Or the configuration is only used 
internally? Thanks!


Best,
Paul Lam





Access Flink configuration in user functions

2018-12-27 Thread Paul Lam
Hi to all,

I would like to use a custom RocksDBStateBackend which uses the default 
checkpoint dir in Flink configuration, but I failed to find a way to access 
Flink configuration in the user code. So I wonder is it possible to retrieve 
Flink configurations (not user-defined global parameters) at the user main 
method? Or the configuration is only used internally? Thanks!

Best,
Paul Lam



Re: Flink configuration

2017-01-25 Thread Greg Hogan
Has anyone reported decreased performance with hyper-threading?

On Tue, Jan 24, 2017 at 11:18 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> that wording is from a time where no-one though about VMs with virtual
> cores. IMHO this maps directly to virtual cores so you should set it
> according to the number of virtual cores of your VMs.
>
> Cheers,
> Aljoscha
>
> On Mon, 23 Jan 2017 at 11:51 Nancy Estrada <nancy.a.estr...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I have been reading about how to configure Flink when we have a set up
>> consisting on a couple of VMs with more than 1 vCore.  I am a bit confused
>> about how to set the degree of parallelism in the
>> taskmanager.numberOfTaskSlots parameter:
>>
>> * According to the Flink documentation[1], this value is typically
>> proportional to the number of/ physical CPU cores/ that the TaskManager’s
>> machine has.
>>
>> * However the YARN documentation[2], makes reference to the number of
>> /Virtual CPU cores/ per TaskManager.
>>
>> My question is, If my Flink Jobs will be running on VMs (without using
>> YARN),  the "taskmanager.numberOfTaskSlots" will depend on the number of
>> vCPU that mi VM has? or must be related to the physical cores?
>>
>> Thanks in advance for your help!
>> Nancy
>>
>> [1]https://ci.apache.org/projects/flink/flink-docs-
>> release-0.8/config.html
>> [2]https://ci.apache.org/projects/flink/flink-docs-
>> release-0.8/yarn_setup.html
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-
>> mailing-list-archive.2336050.n4.nabble.com/Flink-
>> configuration-tp11210.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>


Re: Flink configuration

2017-01-24 Thread Aljoscha Krettek
Hi,
that wording is from a time where no-one though about VMs with virtual
cores. IMHO this maps directly to virtual cores so you should set it
according to the number of virtual cores of your VMs.

Cheers,
Aljoscha

On Mon, 23 Jan 2017 at 11:51 Nancy Estrada <nancy.a.estr...@gmail.com>
wrote:

> Hi all,
>
> I have been reading about how to configure Flink when we have a set up
> consisting on a couple of VMs with more than 1 vCore.  I am a bit confused
> about how to set the degree of parallelism in the
> taskmanager.numberOfTaskSlots parameter:
>
> * According to the Flink documentation[1], this value is typically
> proportional to the number of/ physical CPU cores/ that the TaskManager’s
> machine has.
>
> * However the YARN documentation[2], makes reference to the number of
> /Virtual CPU cores/ per TaskManager.
>
> My question is, If my Flink Jobs will be running on VMs (without using
> YARN),  the "taskmanager.numberOfTaskSlots" will depend on the number of
> vCPU that mi VM has? or must be related to the physical cores?
>
> Thanks in advance for your help!
> Nancy
>
> [1]https://ci.apache.org/projects/flink/flink-docs-release-0.8/config.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/yarn_setup.html
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-configuration-tp11210.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Flink configuration

2017-01-23 Thread Nancy Estrada
Hi all,

I have been reading about how to configure Flink when we have a set up
consisting on a couple of VMs with more than 1 vCore.  I am a bit confused
about how to set the degree of parallelism in the
taskmanager.numberOfTaskSlots parameter:

* According to the Flink documentation[1], this value is typically
proportional to the number of/ physical CPU cores/ that the TaskManager’s
machine has.

* However the YARN documentation[2], makes reference to the number of
/Virtual CPU cores/ per TaskManager.

My question is, If my Flink Jobs will be running on VMs (without using
YARN),  the "taskmanager.numberOfTaskSlots" will depend on the number of
vCPU that mi VM has? or must be related to the physical cores?  

Thanks in advance for your help! 
Nancy

[1]https://ci.apache.org/projects/flink/flink-docs-release-0.8/config.html
[2]https://ci.apache.org/projects/flink/flink-docs-release-0.8/yarn_setup.html



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-configuration-tp11210.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.