Hi,
After some debugging I see these in the logs:

2024-03-22 14:25:47,555 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting
from node 11 due to request timeout.
2024-03-22 14:25:47,647 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled
in-flight FETCH request with correlation id 8 due to node 11 being
disconnected (elapsed time since creation: 30871ms, elapsed time since
send: 30026ms, request timeout: 30000ms)
2024-03-22 14:25:47,650 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting
from node 12 due to request timeout.
2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled
in-flight FETCH request with correlation id 9 due to node 12 being
disconnected (elapsed time since creation: 30871ms, elapsed time since
send: 30036ms, request timeout: 30000ms)
2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting
from node 14 due to request timeout.
2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled
in-flight FETCH request with correlation id 7 due to node 14 being
disconnected (elapsed time since creation: 30871ms, elapsed time since
send: 30026ms, request timeout: 30000ms)
2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting
from node 15 due to request timeout.
2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] -
[Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled
in-flight FETCH request with correlation id 10 due to node 15 being
disconnected (elapsed time since creation: 30871ms, elapsed time since
send: 30026ms, request timeout: 30000ms)
2024-03-22 14:25:47,652 INFO org.apache.kafka.clients.FetchSessionHandler
[] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 11:
org.apache.kafka.common.errors.DisconnectException: null
2024-03-22 14:25:47,657 INFO org.apache.kafka.clients.FetchSessionHandler
[] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 12:
org.apache.kafka.common.errors.DisconnectException: null
2024-03-22 14:25:47,657 INFO org.apache.kafka.clients.FetchSessionHandler
[] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 14:
org.apache.kafka.common.errors.DisconnectException: null
2024-03-22 14:25:47,658 INFO org.apache.kafka.clients.FetchSessionHandler
[] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 15:
org.apache.kafka.common.errors.DisconnectException: null



I think I have started seeing this only recently when I tinkered with the
taskmanager.memory.process.size.
I think I reduced this from 4g to 2g.

Any idea why we the consumer network client is getting disconnected. Is
this because this thread is not getting enough resources or something ?

Thanks
Sachin


On Fri, Mar 22, 2024 at 12:48 PM Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I was experimenting with different starting offset strategies for my Flink
> job, especially in cases where jobs are canceled and scheduled again
> and I would like to start with the last committed offset and if the same
> is not available then start from the latest.
>
> So I decided to use this:
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
>
>
> Now when I start my job in the job master I get is:
>
> Assigning splits to readers {0=[[Partition: mytopic-1, StartingOffset: -3,
> StoppingOffset: -9223372036854775808],
> [Partition: mytopic-2, StartingOffset: -3, StoppingOffset: -
> 9223372036854775808]]}
>
> Looks like here both starting and stopping offsets are negative, I am not
> sure if this is correct or not.
> However what is happening is that no records are getting read from the
> Kafka source.
>
> Can anyone please tell me what is the right starting offset strategy to
> follow, where a new job is started from last committed offsets or latest.
>
> Also please note that if I just keep the starting offset strategy as:
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets())
>
> Now say I have cancelled the job and started again at a much later date, then 
> the committed offset will not longer be available in Kafka topic,
>
> as the data would have been discarded based on topic retention policy.
>
>
> Hence just using the committed offsets strategy does not always work.
>
>
> Thanks
> Sachin
>
>
>

Reply via email to