Yes, I think you're right. This bug appeared after we switched from
the Pulsar Admin API to the Pulsar Client API. Currently, the
connector doesn't check the existing subscription position. I
apologize for this regression. We need to add tests and implement a
fix. Since this is relatively easy to address, @Igor Basov, would you
like to work on it?

On Fri, May 31, 2024 at 11:18 PM Igor Basov <mrbaz...@gmail.com> wrote:
>
> For the record here, I created an issue FLINK-35477 regarding this.
>
> On Mon, May 27, 2024 at 1:21 PM Igor Basov <mrbaz...@gmail.com> wrote:
>>
>> Ok, believe the breaking changes were introduced in this commit.
>> Here it doesn’t check isResetSubscriptionCursor() anymore.
>> Here it doesn’t check if the subscription already exists anymore.
>>
>>
>> On Thu, May 23, 2024 at 4:31 PM Igor Basov <mrbaz...@gmail.com> wrote:
>>>
>>> Hi everyone,
>>>
>>> I have a problem with how Flink deals with the existing subscription in a 
>>> Pulsar topic.
>>>
>>> Subscription has some accumulated backlog
>>> Flink job is deployed from a clear state (no checkpoints)
>>> Flink job uses the same subscription name as the existing one; the start 
>>> cursor is the default one (earliest)
>>>
>>> Based on the docs here, the priority for setting up the cursor position 
>>> should be: checkpoint > existed subscription position > StartCursor. So, 
>>> since there are no checkpoints, I expect the job to get the existing 
>>> position from Pulsar and start reading from there.
>>> But that’s not what I see. As soon as the job is connected to the topic, I 
>>> see the number of messages in the subscription backlog jumping to a new 
>>> high, and JM logs show messages:
>>>
>>> Seeking subscription to the message -1:-1:-1
>>> Successfully reset subscription to the message -1:-1:-1
>>>
>>> Apparently, Flink ignored the existing subscription position and reset its 
>>> cursor position to the earliest.
>>> The related code seems to be here, but I’m not sure if it takes into 
>>> account the existence of subscriptions.
>>>
>>> Flink: 1.18.1
>>> Pulsar connector: org.apache.flink:flink-connector-pulsar:4.1.0-1.18
>>>
>>> Thanks in advance!
>>>
>>> Best regards,
>>> Igor

Reply via email to