For the record here, I created an issue FLINK-35477
<https://issues.apache.org/jira/browse/FLINK-35477> regarding this.

On Mon, May 27, 2024 at 1:21 PM Igor Basov <mrbaz...@gmail.com> wrote:

> Ok, believe the breaking changes were introduced in this commit
> <https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#>
> .
> Here
> <https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#diff-4db00b10562cef1def73b06f0e2765a650c51954b4cf13487984204495d8a776L231>
> it doesn’t check isResetSubscriptionCursor() anymore.
> Here
> <https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#diff-ce7a6c1d29387077c2b19992312c0120bd16580ba5cf9bf222c718dd18a0db2aL86>
> it doesn’t check if the subscription already exists anymore.
>
> On Thu, May 23, 2024 at 4:31 PM Igor Basov <mrbaz...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> I have a problem with how Flink deals with the existing subscription in a
>> Pulsar topic.
>>
>>    - Subscription has some accumulated backlog
>>    - Flink job is deployed from a clear state (no checkpoints)
>>    - Flink job uses the same subscription name as the existing one; the
>>    start cursor is the default one (earliest)
>>
>> Based on the docs here
>> <https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/pulsar/#starting-position>,
>> the priority for setting up the cursor position should be: checkpoint >
>> existed subscription position > StartCursor. So, since there are no
>> checkpoints, I expect the job to get the existing position from Pulsar and
>> start reading from there.
>> But that’s not what I see. As soon as the job is connected to the topic,
>> I see the number of messages in the subscription backlog jumping to a new
>> high, and JM logs show messages:
>>
>> Seeking subscription to the message -1:-1:-1
>> Successfully reset subscription to the message -1:-1:-1
>>
>> Apparently, Flink ignored the existing subscription position and reset
>> its cursor position to the earliest.
>> The related code seems to be here
>> <https://github.com/apache/flink-connector-pulsar/blob/b37a8b32f30683664ff25888d403c4de414043e1/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java#L223>,
>> but I’m not sure if it takes into account the existence of subscriptions.
>>
>> Flink: 1.18.1
>> Pulsar connector: org.apache.flink:flink-connector-pulsar:4.1.0-1.18
>>
>> Thanks in advance!
>>
>> Best regards,
>> Igor
>>
>

Reply via email to