Hi, After some debugging I see these in the logs: 2024-03-22 14:25:47,555 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting from node 11 due to request timeout. 2024-03-22 14:25:47,647 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled in-flight FETCH request with correlation id 8 due to node 11 being disconnected (elapsed time since creation: 30871ms, elapsed time since send: 30026ms, request timeout: 30000ms) 2024-03-22 14:25:47,650 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting from node 12 due to request timeout. 2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled in-flight FETCH request with correlation id 9 due to node 12 being disconnected (elapsed time since creation: 30871ms, elapsed time since send: 30036ms, request timeout: 30000ms) 2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting from node 14 due to request timeout. 2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled in-flight FETCH request with correlation id 7 due to node 14 being disconnected (elapsed time since creation: 30871ms, elapsed time since send: 30026ms, request timeout: 30000ms) 2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Disconnecting from node 15 due to request timeout. 2024-03-22 14:25:47,651 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Cancelled in-flight FETCH request with correlation id 10 due to node 15 being disconnected (elapsed time since creation: 30871ms, elapsed time since send: 30026ms, request timeout: 30000ms) 2024-03-22 14:25:47,652 INFO org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 11: org.apache.kafka.common.errors.DisconnectException: null 2024-03-22 14:25:47,657 INFO org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 12: org.apache.kafka.common.errors.DisconnectException: null 2024-03-22 14:25:47,657 INFO org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 14: org.apache.kafka.common.errors.DisconnectException: null 2024-03-22 14:25:47,658 INFO org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=qubit-data-consumer-0, groupId=spflink] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 15: org.apache.kafka.common.errors.DisconnectException: null
I think I have started seeing this only recently when I tinkered with the taskmanager.memory.process.size. I think I reduced this from 4g to 2g. Any idea why we the consumer network client is getting disconnected. Is this because this thread is not getting enough resources or something ? Thanks Sachin On Fri, Mar 22, 2024 at 12:48 PM Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > I was experimenting with different starting offset strategies for my Flink > job, especially in cases where jobs are canceled and scheduled again > and I would like to start with the last committed offset and if the same > is not available then start from the latest. > > So I decided to use this: > > .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) > > > Now when I start my job in the job master I get is: > > Assigning splits to readers {0=[[Partition: mytopic-1, StartingOffset: -3, > StoppingOffset: -9223372036854775808], > [Partition: mytopic-2, StartingOffset: -3, StoppingOffset: - > 9223372036854775808]]} > > Looks like here both starting and stopping offsets are negative, I am not > sure if this is correct or not. > However what is happening is that no records are getting read from the > Kafka source. > > Can anyone please tell me what is the right starting offset strategy to > follow, where a new job is started from last committed offsets or latest. > > Also please note that if I just keep the starting offset strategy as: > > .setStartingOffsets(OffsetsInitializer.committedOffsets()) > > Now say I have cancelled the job and started again at a much later date, then > the committed offset will not longer be available in Kafka topic, > > as the data would have been discarded based on topic retention policy. > > > Hence just using the committed offsets strategy does not always work. > > > Thanks > Sachin > > >