Re: Job Manager and Task Manager unable to communicate

2023-11-27 Thread Benoit Tailhades
Hello, Tauseef,

Can you give more details ? Are your task manager and job manager on the
same vm ?

How did you configure the Job manager address in the task manager conf file
?
Did you modify the binding in configuration files ?

Benoit

Le lun. 27 nov. 2023 à 14:29, Tauseef Janvekar 
a écrit :

> Dear Team,
>
> We are getting below error messages in our logs.
> Any help on how to resolve would be greatly appreciated.
>
> 2023-11-27 08:14:29,712 INFO  org.apache.pekko.remote.transport.
> ProtocolStateActor [] - No response from remote for outbound
> association. Associate timed out after [2 ms].
> 2023-11-27 08:14:29,713 WARN  org.apache.pekko.remote.
> ReliableDeliverySupervisor   [] - Association with remote system
> [pekko.tcp://flink-metrics@flink-taskmanager:34309] has failed, address
> is now gated for [50] ms. Reason: [Association failed with
> [pekko.tcp://flink-metrics@flink-taskmanager:34309]] Caused by: [No
> response from remote for outbound association. Associate timed out after [
> 2 ms].]
> 2023-11-27 08:14:29,730 WARN  org.apache.pekko.remote.transport.netty.
> NettyTransport   [] - Remote connection to [null] failed with
> org.jboss.netty.channel.ConnectTimeoutException: connection timed out:
> flink-taskmanager/172.20.237.127:34309
> 2023-11-27 08:14:58,401 INFO  org.apache.pekko.remote.transport.
> ProtocolStateActor [] - No response from remote for outbound
> association. Associate timed out after [2 ms].
> 2023-11-27 08:14:58,402 WARN  org.apache.pekko.remote.
> ReliableDeliverySupervisor   [] - Association with remote system
> [pekko.tcp://flink-metrics@flink-taskmanager:34309] has failed, address
> is now gated for [50] ms. Reason: [Association failed with
> [pekko.tcp://flink-metrics@flink-taskmanager:34309]] Caused by: [No
> response from remote for outbound association. Associate timed out after [
> 2 ms].]
> 2023-11-27 08:14:58,426 WARN  org.apache.pekko.remote.transport.netty.
> NettyTransport   [] - Remote connection to [null] failed with
> org.jboss.netty.channel.ConnectTimeoutException: connection timed out:
> flink-taskmanager/172.20.237.127:34309
> 2023-11-27 08:15:22,402 INFO  org.apache.pekko.remote.transport.
> ProtocolStateActor [] - No response from remote for outbound
> association. Associate timed out after [2 ms].
> 2023-11-27 08:15:22,403 WARN  org.apache.pekko.remote.
> ReliableDeliverySupervisor   [] - Association with remote system
> [pekko.tcp://flink-metrics@flink-taskmanager:34309] has failed, address
> is now gated for [50] ms. Reason: [Association failed with
> [pekko.tcp://flink-metrics@flink-taskmanager:34309]] Caused by: [No
> response from remote for outbound association. Associate timed out after [
> 2 ms].]
> 2023-11-27 08:15:22,434 WARN  org.apache.pekko.remote.transport.netty.
> NettyTransport   [] - Remote connection to [null] failed with
> org.jboss.netty.channel.ConnectTimeoutException: connection timed out:
> flink-taskmanager/172.20.237.127:34309
> 2023-11-27 08:15:46,411 INFO  org.apache.pekko.remote.transport.
> ProtocolStateActor [] - No response from remote for outbound
> association. Associate timed out after [2 ms].
> 2023-11-27 08:15:46,412 WARN  org.apache.pekko.remote.
> ReliableDeliverySupervisor   [] - Association with remote system
> [pekko.tcp://flink-metrics@flink-taskmanager:34309] has failed, address
> is now gated for [50] ms. Reason: [Association failed with
> [pekko.tcp://flink-metrics@flink-taskmanager:34309]] Caused by: [No
> response from remote for outbound association. Associate timed out after [
> 2 ms].]
> 2023-11-27 08:15:46,436 WARN  org.apache.pekko.remote.transport.netty.
> NettyTransport   [] - Remote connection to [null] failed with
> org.jboss.netty.channel.ConnectTimeoutException: connection timed out:
> flink-taskmanager/172.20.237.127:34309
> 2023-11-27 08:16:10,434 INFO  org.apache.pekko.remote.transport.
> ProtocolStateActor [] - No response from remote for outbound
> association. Associate timed out after [2 ms].
> 2023-11-27 08:16:10,435 WARN  org.apache.pekko.remote.
> ReliableDeliverySupervisor   [] - Association with remote system
> [pekko.tcp://flink-metrics@flink-taskmanager:34309] has failed, address
> is now gated for [50] ms. Reason: [Association failed with
> [pekko.tcp://flink-metrics@flink-taskmanager:34309]] Caused by: [No
> response from remote for outbound association. Associate timed out after [
> 2 ms].]
> 2023-11-27 08:16:10,477 WARN  org.apache.pekko.remote.transport.netty.
> NettyTransport   [] - Remote connection to [null] failed with
> org.jboss.netty.channel.ConnectTimeoutException: connection timed out:
> flink-taskmanager/172.20.237.127:34309
> 2023-11-27 08:16:34,402 WARN  org.apache.pekko.remote.
> ReliableDeliverySupervisor   [] - Association with remote system
> [pekko.tcp://flink-metrics@flink-taskmanager:34309] has failed, 

Re: Recursive Split Detection + same split optimization

2023-07-10 Thread Benoit Tailhades
Thank you Hang for your answer.

Regarding your proposal 2, implementing such logic will prevent
parallelizing on TM, since from the 1st ID, I will fetch n IDs, but with
this approach, all IDs will finally be managed by the same TM.
However, I am not totally satisfied with the 1st choice which is the one I
implemented because it relies on events mechanism which is a custom
solution. There is no such flink mechanism to allow what I am trying to
achieve ?
By the way, the solution works perfectly, but using events is for me like a
bypass to a missing functionality.

Thank you for your precious help.

Benoit

Le lun. 10 juil. 2023 à 09:19, Hang Ruan  a écrit :

> Hi, Benoit.
>
> A split enumerator responsible for discovering the source splits, and
> assigning them to the reader. It seems like that your connector discovering
> splits in TM and assigning them in JM.
>
> I think there are 2 choices:
> 1. If you need the enumerator to assign splits, you have to send the
> events about the splits between the source reader and the enumerator.
> 2. If you can make use of the subtaskId and let every reader read some
> scope of the IDs, the enumerator is useless for you.
>
> I am not sure whether you are able to move the discovering splits task
> back to the enumerator by multi thread. Putting it to the TM may be
> weird and error-prone.
>
> Finally, I have a second problem which is about avoiding extracting
>> multiple times the same split. We can imagine, based on my previous
>> explanation, that same ID might be detected through multiple parent splits.
>> To avoid losing time doing the same job multiple times, I need to avoid
>> extracting the same ID.
>> Actually, I am thinking about storing the already extracted ID into the
>> state and storing it into my state backend. What do you think about this ?
>>
>
> For choice 1, put the information in the enumerator's state.
> For choice 2, no need to consider that issue.
>
> Best,
> Hang
>
> Benoit Tailhades  于2023年7月10日周一 12:59写道:
>
>> Hello Everyone,
>>
>> I am trying to implement a custom source where split detection is an
>> expensive operation and I would like to benefit from the split reader
>> results to build my next splits.
>>
>> Basically, my source is taking as input an id from my external system,
>> let's name it ID1.
>>
>> From ID1, I can get a list of other sub splits but getting this list is
>> an expensive operation so I want it to be done on a task manager during the
>> split reading of ID1. Now we can imagine sub splits of ID1 are ID1.1 and
>> ID1.2.
>> So, to sum up my split reader of ID1 will be responsible for:
>> 1. Collecting content of ID1
>> 2. Producing n sub splits
>> Then, the split enumerator will receive these sub splits and schedule
>> ID1.1, ... ID1.n for split reading.
>>
>> As of now, I have implemented this mechanism using events between split
>> reader and split enumerator but I think there might be a better
>> architecture using Flink.
>>
>> Finally, I have a second problem which is about avoiding extracting
>> multiple times the same split. We can imagine, based on my previous
>> explanation, that same ID might be detected through multiple parent splits.
>> To avoid losing time doing the same job multiple times, I need to avoid
>> extracting the same ID.
>> Actually, I am thinking about storing the already extracted ID into the
>> state and storing it into my state backend. What do you think about this ?
>>
>> Thank you.
>>
>> Benoit
>>
>>


Recursive Split Detection + same split optimization

2023-07-09 Thread Benoit Tailhades
Hello Everyone,

I am trying to implement a custom source where split detection is an
expensive operation and I would like to benefit from the split reader
results to build my next splits.

Basically, my source is taking as input an id from my external system,
let's name it ID1.

>From ID1, I can get a list of other sub splits but getting this list is an
expensive operation so I want it to be done on a task manager during the
split reading of ID1. Now we can imagine sub splits of ID1 are ID1.1 and
ID1.2.
So, to sum up my split reader of ID1 will be responsible for:
1. Collecting content of ID1
2. Producing n sub splits
Then, the split enumerator will receive these sub splits and schedule
ID1.1, ... ID1.n for split reading.

As of now, I have implemented this mechanism using events between split
reader and split enumerator but I think there might be a better
architecture using Flink.

Finally, I have a second problem which is about avoiding extracting
multiple times the same split. We can imagine, based on my previous
explanation, that same ID might be detected through multiple parent splits.
To avoid losing time doing the same job multiple times, I need to avoid
extracting the same ID.
Actually, I am thinking about storing the already extracted ID into the
state and storing it into my state backend. What do you think about this ?

Thank you.

Benoit