Yes, I think you're right. This bug appeared after we switched from the Pulsar Admin API to the Pulsar Client API. Currently, the connector doesn't check the existing subscription position. I apologize for this regression. We need to add tests and implement a fix. Since this is relatively easy to address, @Igor Basov, would you like to work on it?
On Fri, May 31, 2024 at 11:18 PM Igor Basov <mrbaz...@gmail.com> wrote: > > For the record here, I created an issue FLINK-35477 regarding this. > > On Mon, May 27, 2024 at 1:21 PM Igor Basov <mrbaz...@gmail.com> wrote: >> >> Ok, believe the breaking changes were introduced in this commit. >> Here it doesn’t check isResetSubscriptionCursor() anymore. >> Here it doesn’t check if the subscription already exists anymore. >> >> >> On Thu, May 23, 2024 at 4:31 PM Igor Basov <mrbaz...@gmail.com> wrote: >>> >>> Hi everyone, >>> >>> I have a problem with how Flink deals with the existing subscription in a >>> Pulsar topic. >>> >>> Subscription has some accumulated backlog >>> Flink job is deployed from a clear state (no checkpoints) >>> Flink job uses the same subscription name as the existing one; the start >>> cursor is the default one (earliest) >>> >>> Based on the docs here, the priority for setting up the cursor position >>> should be: checkpoint > existed subscription position > StartCursor. So, >>> since there are no checkpoints, I expect the job to get the existing >>> position from Pulsar and start reading from there. >>> But that’s not what I see. As soon as the job is connected to the topic, I >>> see the number of messages in the subscription backlog jumping to a new >>> high, and JM logs show messages: >>> >>> Seeking subscription to the message -1:-1:-1 >>> Successfully reset subscription to the message -1:-1:-1 >>> >>> Apparently, Flink ignored the existing subscription position and reset its >>> cursor position to the earliest. >>> The related code seems to be here, but I’m not sure if it takes into >>> account the existence of subscriptions. >>> >>> Flink: 1.18.1 >>> Pulsar connector: org.apache.flink:flink-connector-pulsar:4.1.0-1.18 >>> >>> Thanks in advance! >>> >>> Best regards, >>> Igor