Hi everyone,

I have a problem with how Flink deals with the existing subscription in a
Pulsar topic.

   - Subscription has some accumulated backlog
   - Flink job is deployed from a clear state (no checkpoints)
   - Flink job uses the same subscription name as the existing one; the
   start cursor is the default one (earliest)

Based on the docs here
<https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/pulsar/#starting-position>,
the priority for setting up the cursor position should be: checkpoint >
existed subscription position > StartCursor. So, since there are no
checkpoints, I expect the job to get the existing position from Pulsar and
start reading from there.
But that’s not what I see. As soon as the job is connected to the topic, I
see the number of messages in the subscription backlog jumping to a new
high, and JM logs show messages:

Seeking subscription to the message -1:-1:-1
Successfully reset subscription to the message -1:-1:-1

Apparently, Flink ignored the existing subscription position and reset its
cursor position to the earliest.
The related code seems to be here
<https://github.com/apache/flink-connector-pulsar/blob/b37a8b32f30683664ff25888d403c4de414043e1/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java#L223>,
but I’m not sure if it takes into account the existence of subscriptions.

Flink: 1.18.1
Pulsar connector: org.apache.flink:flink-connector-pulsar:4.1.0-1.18

Thanks in advance!

Best regards,
Igor

Reply via email to