Re: Running Flink on Yarn

2019-01-02 Thread Anil
Hi Andrey. Thanks for the reply. Apologies about the late follow up, I was
out of office. 
Suppose I have 3 TM and each has 3 task slot and each kafka stream has 9
partitions each. Each thread will consumer from  stream 1 (a1) and stream 2
(a2). Considering the query, data will need to be buffered for stream a2.
How will this data be stored internally and shared among thread on on task
slots of the same TM and with different TM.
What's the advantage of having multiple task slot in a task manager if I'm
using Yarn Deployment mode, where the entire pipeline is deployed in a
single slot.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Running Flink on Yarn

2018-12-24 Thread Andrey Zagrebin
I think the data buffered for join will be distributed among threads by
order_id (a1 and a2 will be internally keyed).
Each thread will have non-shared window state (for 2 hours) per certain
order_id's.
Slots will share some common JVM resources mentioned in docs, also access
to state DB but not the majority of storage occupied by state.

cc Timo, Piotr

On Mon, Dec 24, 2018 at 7:46 PM Anil  wrote:

> I am using  time-windowed join only. Here's a sample query -
>
> SELECT a1.order_id, a2.order.restaurant_id FROM awz_s3_stream1 a1 INNER
> JOIN
> awz_s3_stream2 a2 ON CAST(a1.order_id AS VARCHAR) = a2.order_id AND
> a1.to_state = 'PLACED' AND a1.proctime BETWEEN a2.proctime - INTERVAL '2'
> HOUR AND a2.proctime + INTERVAL '2' HOUR GROUP BY HOP(a2.proctime, INTERVAL
> '2' MINUTE, INTERVAL '1' HOUR), a2.`order`.restaurant_id
>
> Just to simplify my question -
>
> Suppose I have a TM with 4 slots and I deploy a flink job with
> parallelism=4
> with 2 container - 1 JM and 1 TM. Each parallel instance will be deployed
> in
> one task slot each in the TM (the entire job pipeline running per slot ).My
> jobs does a join(SQL time-windowed join on non-keyed stream) and they
> buffer
> last few hours of data. My question is will these threads running in
> different task slot share this data buffered for join. What all data is
> shared across these threads.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Running Flink on Yarn

2018-12-24 Thread Anil
I am using  time-windowed join only. Here's a sample query - 

SELECT a1.order_id, a2.order.restaurant_id FROM awz_s3_stream1 a1 INNER JOIN
awz_s3_stream2 a2 ON CAST(a1.order_id AS VARCHAR) = a2.order_id AND
a1.to_state = 'PLACED' AND a1.proctime BETWEEN a2.proctime - INTERVAL '2'
HOUR AND a2.proctime + INTERVAL '2' HOUR GROUP BY HOP(a2.proctime, INTERVAL
'2' MINUTE, INTERVAL '1' HOUR), a2.`order`.restaurant_id

Just to simplify my question - 

Suppose I have a TM with 4 slots and I deploy a flink job with parallelism=4
with 2 container - 1 JM and 1 TM. Each parallel instance will be deployed in
one task slot each in the TM (the entire job pipeline running per slot ).My
jobs does a join(SQL time-windowed join on non-keyed stream) and they buffer
last few hours of data. My question is will these threads running in
different task slot share this data buffered for join. What all data is
shared across these threads.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Running Flink on Yarn

2018-12-24 Thread Andrey Zagrebin
If you mean time-windowed join documented here [1].
I think it implicitly uses keyed stream [2] where the key is the field in
equi-join predicate.
The window state is also keyed [3] in this case.
I also cc Timo and Piotr, they might add more to this topic.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
[2]
https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala#L261
[3]
https://github.com/apache/flink/blob/c8b2ee27d80b1437dd65a9327da65c251febd736/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala#L78

On Mon, Dec 24, 2018 at 6:29 PM Anil  wrote:

> Thanks for the quick response Andrey. I'm doing a SQL time-windowed join on
> non-keyed stream.
> So all the thread in various task slot in the same TM will share this
> state.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Running Flink on Yarn

2018-12-24 Thread Anil
Thanks for the quick response Andrey. I'm doing a SQL time-windowed join on
non-keyed stream.
So all the thread in various task slot in the same TM will share this state. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Running Flink on Yarn

2018-12-24 Thread Andrey Zagrebin
Hi,

I suppose you apply windowing to a keyed stream or SQL time-windowed join? 
Globally windowed streams are non-parallel and processed/stored in one slot.

In case of keyed stream, total range of key values is distributed among slots.
Each slot processes/stores only a subrange of keys.
Window state is stored per key then.
This implies that each slot stores its own data, not the same.
The keyed state is not shared among slots in the same JVM.

Best,
Andrey

> On 23 Dec 2018, at 21:35, Anil  wrote:
> 
> I  have a setup for Flink(1.4.2) with YARN. I'm using Flink Yarn Client for
> deploying my jobs to Yarn Cluster. 
> 
> In the current setup parallelism was directly mapped to the number of cores,
> with each parallel instance of the job running in one container. So for a
> parallelism of 9, there are 10 containers - 1 JM and 9 TM and each container
> has 1 core. Each container(or each parallel instance) has one task manager
> and each slot holds the entire pipeline for the job. 
> 
> Most of the jobs have a join with the window storing data for last ⅔ hours.
> As per my understanding here, 
> each container will save it's own copy of the this last 2/3 hours data and
> this is not shared between two container. 
> 
> Since this window data will be same across each container, I feel if I could
> have one task manager with  with multiple task slot that could share this
> window data I could save a lot on my resources (each container won't need to
> maintain it's own copy of window data). If I had 3 container each with one
> TM and 3 Task Slot each, then I would need only 3 containers for my job to
> achieve a parallelism of 9 (each task slot will hold the entire job
> pipeline, so each container helps me achieve a parallelism of 3
> individually). I'm assuming that this window data will be shared among all
> parallel instance running in different task slot in each container. Please
> correct me here. 
> 
> As per flink docs - 
> 
> Having multiple slots means more subtasks share the same JVM. Tasks in the
> same JVM share TCP connections (via multiplexing) and heartbeat messages.
> They may also share data sets and data structures, thus reducing the
> per-task overhead. 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Running Flink on Yarn

2018-12-23 Thread Anil
I  have a setup for Flink(1.4.2) with YARN. I'm using Flink Yarn Client for
deploying my jobs to Yarn Cluster. 

In the current setup parallelism was directly mapped to the number of cores,
with each parallel instance of the job running in one container. So for a
parallelism of 9, there are 10 containers - 1 JM and 9 TM and each container
has 1 core. Each container(or each parallel instance) has one task manager
and each slot holds the entire pipeline for the job. 

Most of the jobs have a join with the window storing data for last ⅔ hours.
As per my understanding here, 
each container will save it's own copy of the this last 2/3 hours data and
this is not shared between two container. 

Since this window data will be same across each container, I feel if I could
have one task manager with  with multiple task slot that could share this
window data I could save a lot on my resources (each container won't need to
maintain it's own copy of window data). If I had 3 container each with one
TM and 3 Task Slot each, then I would need only 3 containers for my job to
achieve a parallelism of 9 (each task slot will hold the entire job
pipeline, so each container helps me achieve a parallelism of 3
individually). I'm assuming that this window data will be shared among all
parallel instance running in different task slot in each container. Please
correct me here. 

As per flink docs - 

Having multiple slots means more subtasks share the same JVM. Tasks in the
same JVM share TCP connections (via multiplexing) and heartbeat messages.
They may also share data sets and data structures, thus reducing the
per-task overhead. 





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Task manager count goes the expand then converge process when running flink on YARN

2018-10-25 Thread Till Rohrmann
Hi Henry,

since version 1.5 you don't need to specify the number of TaskManagers to
start, because the system will figure this out. Moreover, in version 1.5.x
and 1.6.x it is recommended to set the number of slots per TaskManager to 1
since we did not support multi task slot TaskManagers properly. The problem
was that we start for every incoming slot request a separate TaskManager
even though there might still be some free slots left. This has been fixed
by FLINK-9455 [1]. The fix will be released with the upcoming next major
Flink release 1.7.

[1] https://issues.apache.org/jira/browse/FLINK-9455

Cheers,
Till

On Thu, Oct 25, 2018 at 5:58 AM vino yang  wrote:

> Hi Henry,
>
> The phenomenon you expressed is there, this is a bug, but I can't remember
> its JIRA number.
>
> Thanks, vino.
>
> 徐涛  于2018年10月24日周三 下午11:27写道:
>
>> Hi experts
>> I am running flink job on YARN in job cluster mode, the job is divided
>> into 2 tasks, the following are some configs of the job:
>> parallelism.default => 16
>> taskmanager.numberOfTaskSlots => 8
>> -yn => 2
>>
>> when the program starts, I found that the count of task managers is not
>> set immediately, but first expand then converge, I record the number during
>> the process:
>> Task Managers Task Slots Available Task Slots
>> 1. 14  10488
>> 2. 15 120104
>> 3. 16 128112
>> 4. 6   48  32
>> 5. 3   24  8
>> 6. 2   16  0
>>
>> The final state is correct. There are 2 tasks, 32 subtask in total, due
>> to slot sharing, only 16 slots are enough, the number of task slots per TM
>> are 8, so 2 TMs are needed.
>> I have the following question:
>> *Because I specify yn=2, why does not directly allocate 2 TMs, but goes
>> the expand then converge process?  Why does it apply 16 task managers at
>> most? If it is not a must, how to avoid it?*
>>
>> Thanks a lot!
>>
>> Best
>> Henry
>>
>


Re: Task manager count goes the expand then converge process when running flink on YARN

2018-10-24 Thread vino yang
Hi Henry,

The phenomenon you expressed is there, this is a bug, but I can't remember
its JIRA number.

Thanks, vino.

徐涛  于2018年10月24日周三 下午11:27写道:

> Hi experts
> I am running flink job on YARN in job cluster mode, the job is divided
> into 2 tasks, the following are some configs of the job:
> parallelism.default => 16
> taskmanager.numberOfTaskSlots => 8
> -yn => 2
>
> when the program starts, I found that the count of task managers is not
> set immediately, but first expand then converge, I record the number during
> the process:
> Task Managers Task Slots Available Task Slots
> 1. 14  10488
> 2. 15 120104
> 3. 16 128112
> 4. 6   48  32
> 5. 3   24  8
> 6. 2   16  0
>
> The final state is correct. There are 2 tasks, 32 subtask in total, due to
> slot sharing, only 16 slots are enough, the number of task slots per TM are
> 8, so 2 TMs are needed.
> I have the following question:
> *Because I specify yn=2, why does not directly allocate 2 TMs, but goes
> the expand then converge process?  Why does it apply 16 task managers at
> most? If it is not a must, how to avoid it?*
>
> Thanks a lot!
>
> Best
> Henry
>


Task manager count goes the expand then converge process when running flink on YARN

2018-10-24 Thread 徐涛
Hi experts
I am running flink job on YARN in job cluster mode, the job is divided 
into 2 tasks, the following are some configs of the job:
parallelism.default => 16
taskmanager.numberOfTaskSlots => 8
-yn => 2

when the program starts, I found that the count of task managers is not 
set immediately, but first expand then converge, I record the number during the 
process:
Task Managers Task Slots Available Task Slots
1.  14   10488
2.  15 120104
3.  16 128112
4.   6   48  32
5.   3   24  8
6.   2   16  0

The final state is correct. There are 2 tasks, 32 subtask in total, due 
to slot sharing, only 16 slots are enough, the number of task slots per TM are 
8, so 2 TMs are needed.
I have the following question:
Because I specify yn=2, why does not directly allocate 2 TMs, but goes 
the expand then converge process?  Why does it apply 16 task managers at most? 
If it is not a must, how to avoid it?

Thanks a lot!

Best
Henry


Task manager count goes the expand then converge process when running flink on YARN

2018-10-24 Thread 徐涛
Hi experts
I am running flink job on YARN in job cluster mode, the job is divided 
into 2 tasks, the following are some configs of the job:
parallelism.default => 16
taskmanager.numberOfTaskSlots => 8
-yn => 2

when the program starts, I found that the count of task managers is not 
set immediately, but first expand then converge, I record the number during the 
process:
Task Managers Task Slots Available Task Slots
1.  14   10488
2.  15 120104
3.  16 128112
4.   6   48  32
5.   3   24  8
6.   2   16  0

The final state is correct. There are 2 tasks, 32 subtask in total, due 
to slot sharing, only 16 slots are enough, the number of task slots per TM are 
8, so 2 TMs are needed.
I have the following question:
Because I specify yn=2, why does not directly allocate 2 TMs, but goes 
the expand then converge process?  Why does it apply 16 task managers at most? 
If it is not a must, how to avoid it?

Thanks a lot!

Best
Henry


Running flink on YARN

2017-10-12 Thread Navneeth Krishnan
Hello,

I'm running flink on AWS EMR and I would like to know how I can pass a
custom log4j properties file. I changed the log4j.properties file in flink
conf directory but it doesn't seem like the changes are reflected. Thanks.

I'm using the below command to start my flink job.
> flink run -m yarn-cluster

Regards,
Navneeth