Re: Pulsar connector resets existing subscription

2024-05-31 Thread Igor Basov
For the record here, I created an issue FLINK-35477
<https://issues.apache.org/jira/browse/FLINK-35477> regarding this.

On Mon, May 27, 2024 at 1:21 PM Igor Basov  wrote:

> Ok, believe the breaking changes were introduced in this commit
> <https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#>
> .
> Here
> <https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#diff-4db00b10562cef1def73b06f0e2765a650c51954b4cf13487984204495d8a776L231>
> it doesn’t check isResetSubscriptionCursor() anymore.
> Here
> <https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#diff-ce7a6c1d29387077c2b19992312c0120bd16580ba5cf9bf222c718dd18a0db2aL86>
> it doesn’t check if the subscription already exists anymore.
>
> On Thu, May 23, 2024 at 4:31 PM Igor Basov  wrote:
>
>> Hi everyone,
>>
>> I have a problem with how Flink deals with the existing subscription in a
>> Pulsar topic.
>>
>>- Subscription has some accumulated backlog
>>- Flink job is deployed from a clear state (no checkpoints)
>>- Flink job uses the same subscription name as the existing one; the
>>start cursor is the default one (earliest)
>>
>> Based on the docs here
>> <https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/pulsar/#starting-position>,
>> the priority for setting up the cursor position should be: checkpoint >
>> existed subscription position > StartCursor. So, since there are no
>> checkpoints, I expect the job to get the existing position from Pulsar and
>> start reading from there.
>> But that’s not what I see. As soon as the job is connected to the topic,
>> I see the number of messages in the subscription backlog jumping to a new
>> high, and JM logs show messages:
>>
>> Seeking subscription to the message -1:-1:-1
>> Successfully reset subscription to the message -1:-1:-1
>>
>> Apparently, Flink ignored the existing subscription position and reset
>> its cursor position to the earliest.
>> The related code seems to be here
>> <https://github.com/apache/flink-connector-pulsar/blob/b37a8b32f30683664ff25888d403c4de414043e1/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java#L223>,
>> but I’m not sure if it takes into account the existence of subscriptions.
>>
>> Flink: 1.18.1
>> Pulsar connector: org.apache.flink:flink-connector-pulsar:4.1.0-1.18
>>
>> Thanks in advance!
>>
>> Best regards,
>> Igor
>>
>


Re: Pulsar connector resets existing subscription

2024-05-27 Thread Igor Basov
Ok, believe the breaking changes were introduced in this commit
<https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#>
.
Here
<https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#diff-4db00b10562cef1def73b06f0e2765a650c51954b4cf13487984204495d8a776L231>
it doesn’t check isResetSubscriptionCursor() anymore.
Here
<https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#diff-ce7a6c1d29387077c2b19992312c0120bd16580ba5cf9bf222c718dd18a0db2aL86>
it doesn’t check if the subscription already exists anymore.

On Thu, May 23, 2024 at 4:31 PM Igor Basov  wrote:

> Hi everyone,
>
> I have a problem with how Flink deals with the existing subscription in a
> Pulsar topic.
>
>- Subscription has some accumulated backlog
>- Flink job is deployed from a clear state (no checkpoints)
>- Flink job uses the same subscription name as the existing one; the
>start cursor is the default one (earliest)
>
> Based on the docs here
> <https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/pulsar/#starting-position>,
> the priority for setting up the cursor position should be: checkpoint >
> existed subscription position > StartCursor. So, since there are no
> checkpoints, I expect the job to get the existing position from Pulsar and
> start reading from there.
> But that’s not what I see. As soon as the job is connected to the topic, I
> see the number of messages in the subscription backlog jumping to a new
> high, and JM logs show messages:
>
> Seeking subscription to the message -1:-1:-1
> Successfully reset subscription to the message -1:-1:-1
>
> Apparently, Flink ignored the existing subscription position and reset its
> cursor position to the earliest.
> The related code seems to be here
> <https://github.com/apache/flink-connector-pulsar/blob/b37a8b32f30683664ff25888d403c4de414043e1/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java#L223>,
> but I’m not sure if it takes into account the existence of subscriptions.
>
> Flink: 1.18.1
> Pulsar connector: org.apache.flink:flink-connector-pulsar:4.1.0-1.18
>
> Thanks in advance!
>
> Best regards,
> Igor
>


Pulsar connector resets existing subscription

2024-05-23 Thread Igor Basov
Hi everyone,

I have a problem with how Flink deals with the existing subscription in a
Pulsar topic.

   - Subscription has some accumulated backlog
   - Flink job is deployed from a clear state (no checkpoints)
   - Flink job uses the same subscription name as the existing one; the
   start cursor is the default one (earliest)

Based on the docs here
,
the priority for setting up the cursor position should be: checkpoint >
existed subscription position > StartCursor. So, since there are no
checkpoints, I expect the job to get the existing position from Pulsar and
start reading from there.
But that’s not what I see. As soon as the job is connected to the topic, I
see the number of messages in the subscription backlog jumping to a new
high, and JM logs show messages:

Seeking subscription to the message -1:-1:-1
Successfully reset subscription to the message -1:-1:-1

Apparently, Flink ignored the existing subscription position and reset its
cursor position to the earliest.
The related code seems to be here
,
but I’m not sure if it takes into account the existence of subscriptions.

Flink: 1.18.1
Pulsar connector: org.apache.flink:flink-connector-pulsar:4.1.0-1.18

Thanks in advance!

Best regards,
Igor


Re: Savepoint/checkpoint confusion

2021-05-20 Thread Igor Basov
Hey Robert,
Thanks for the answer! But then I guess the only difference between
savepoints and checkpoints is that checkpoints are structurally state
dependent and can be incremental, but otherwise they are functionally
equivalent. So functionally savepoint can be considered a full checkpoint
which provides 2 additional benefits: it's made on-demand and the state
backend can be changed (since 1.13). Is this correct?

On Thu, 20 May 2021 at 05:35, Robert Metzger  wrote:

> Hey Igor,
>
> 1) yes, reactive mode indeed does the same.
> 2) No, HA mode is only storing some metadata in ZK about the leadership
> and latest checkpoints, but the checkpoints itself are the same. They
> should be usable for a changed job graph (if the state matches the
> operators by setting the UUIDs [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/#set-uuids-for-all-operators
>
>
> On Fri, May 7, 2021 at 10:13 PM Igor Basov  wrote:
>
>> Hello,
>> I got confused about usage of savepoints and checkpoints in different
>> scenarios.
>> I understand that checkpoints' main purpose is fault tolerance, they are
>> more lightweight and don't support changing job graph, parallelism or state
>> backend when restoring from them, as mentioned in the latest 1.13 docs:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#difference-to-savepoints
>>
>> At the same time:
>> 1) Reactive scaling mode (in 1.13) uses checkpoints exactly for that -
>> rescaling.
>> 2) There are use cases like here:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-happens-when-a-job-is-rescaled-td39462.html
>> where people seem to be using retained checkpoints instead of savepoints
>> to do manual job restarts with rescaling.
>> 3) There are claims like here:
>>
>> https://lists.apache.org/thread.html/4299518f4da2810aa88fe6b21f841880b619f3f8ac264084a318c034%40%3Cuser.flink.apache.org%3E
>> that in HA setup JobManager is able to restart from a checkpoint even if
>> operators are added/removed or parallelism is changed (in this case I'm not
>> sure if the checkpoints used by HA JM in `high-availability.storageDir` is
>> the same thing as usual checkpoints).
>>
>> So I guess the questions are:
>> 1) Can retained checkpoints be safely used for manual restarting and
>> rescaling a job?
>> 2) Are checkpoints made by HA JM structurally different from the usual
>> ones? Can they be used to restore a job with a changed job graph?
>>
>> Thank you,
>> Igor
>>
>>
>


Savepoint/checkpoint confusion

2021-05-07 Thread Igor Basov
Hello,
I got confused about usage of savepoints and checkpoints in different
scenarios.
I understand that checkpoints' main purpose is fault tolerance, they are
more lightweight and don't support changing job graph, parallelism or state
backend when restoring from them, as mentioned in the latest 1.13 docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#difference-to-savepoints

At the same time:
1) Reactive scaling mode (in 1.13) uses checkpoints exactly for that -
rescaling.
2) There are use cases like here:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-happens-when-a-job-is-rescaled-td39462.html
where people seem to be using retained checkpoints instead of savepoints to
do manual job restarts with rescaling.
3) There are claims like here:
https://lists.apache.org/thread.html/4299518f4da2810aa88fe6b21f841880b619f3f8ac264084a318c034%40%3Cuser.flink.apache.org%3E
that in HA setup JobManager is able to restart from a checkpoint even if
operators are added/removed or parallelism is changed (in this case I'm not
sure if the checkpoints used by HA JM in `high-availability.storageDir` is
the same thing as usual checkpoints).

So I guess the questions are:
1) Can retained checkpoints be safely used for manual restarting and
rescaling a job?
2) Are checkpoints made by HA JM structurally different from the usual
ones? Can they be used to restore a job with a changed job graph?

Thank you,
Igor