Re: Pulsar connector resets existing subscription

2024-06-04 Thread Yufan Sheng
Yes, I think you're right. This bug appeared after we switched from
the Pulsar Admin API to the Pulsar Client API. Currently, the
connector doesn't check the existing subscription position. I
apologize for this regression. We need to add tests and implement a
fix. Since this is relatively easy to address, @Igor Basov, would you
like to work on it?

On Fri, May 31, 2024 at 11:18 PM Igor Basov  wrote:
>
> For the record here, I created an issue FLINK-35477 regarding this.
>
> On Mon, May 27, 2024 at 1:21 PM Igor Basov  wrote:
>>
>> Ok, believe the breaking changes were introduced in this commit.
>> Here it doesn’t check isResetSubscriptionCursor() anymore.
>> Here it doesn’t check if the subscription already exists anymore.
>>
>>
>> On Thu, May 23, 2024 at 4:31 PM Igor Basov  wrote:
>>>
>>> Hi everyone,
>>>
>>> I have a problem with how Flink deals with the existing subscription in a 
>>> Pulsar topic.
>>>
>>> Subscription has some accumulated backlog
>>> Flink job is deployed from a clear state (no checkpoints)
>>> Flink job uses the same subscription name as the existing one; the start 
>>> cursor is the default one (earliest)
>>>
>>> Based on the docs here, the priority for setting up the cursor position 
>>> should be: checkpoint > existed subscription position > StartCursor. So, 
>>> since there are no checkpoints, I expect the job to get the existing 
>>> position from Pulsar and start reading from there.
>>> But that’s not what I see. As soon as the job is connected to the topic, I 
>>> see the number of messages in the subscription backlog jumping to a new 
>>> high, and JM logs show messages:
>>>
>>> Seeking subscription to the message -1:-1:-1
>>> Successfully reset subscription to the message -1:-1:-1
>>>
>>> Apparently, Flink ignored the existing subscription position and reset its 
>>> cursor position to the earliest.
>>> The related code seems to be here, but I’m not sure if it takes into 
>>> account the existence of subscriptions.
>>>
>>> Flink: 1.18.1
>>> Pulsar connector: org.apache.flink:flink-connector-pulsar:4.1.0-1.18
>>>
>>> Thanks in advance!
>>>
>>> Best regards,
>>> Igor


Re: Access ExecutionConfig from new Source and Sink API

2023-04-03 Thread Yufan Sheng
I agree with you. It's quite useful to access the ExecutionConfig in
Source API. When I develop the flink-connector-pulsar. The only
configuration that I can't access is the checkpoint configure which is
defined in ExecutionConfig. I can switch the behavior automatically by
the checkpoint switch. So I have to add more custom configurations for
the Pulsar Source.

On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee  wrote:
>
> Hello,
>
> I'm trying to develop Flink connectors to NATS using the new FLIP-27 and 
> FLIP-143 APIs. The scaffolding is more complicated than the old 
> SourceFunction and SinkFunction, but not terrible. However I can't figure out 
> how to access the ExecutionConfig under these new APIs. This was possible in 
> the old APIs by way of the RuntimeContext of the AbstractRichFunction (which 
> are extended by RichSourceFunction and RichSinkFunction).
>
> The reason I would like this is:  some interactions with external systems may 
> be invalid under certain Flink job execution parameters. Consider a system 
> like NATS which allows for acknowledgements of messages received. I would 
> ideally acknowledge all received messages by the source connector during 
> checkpointing. If I fail to acknowledge the delivered messages, after a 
> pre-configured amount of time, NATS would resend the message (which is good 
> in my case for fault tolerance).
>
> However, if a Flink job using these connectors has disabled checkpointing or 
> made the interval too large, the connector will never acknowledge delivered 
> messages and the NATS system may send the message again and cause duplicate 
> data. I would be able to avoid this if I could access the ExecutionConfig to 
> check these parameters and throw early.
>
> I know that the SourceReaderContext gives me access to the Configuration, but 
> that doesn't handle the case where the execution-environment is set 
> programatically in a job definition rather than through configuration. Any 
> ideas?
>
> Thanks,
> Chris


Re: [SURVEY] Drop Share and Key_Shared subscription support in Pulsar connector

2022-12-18 Thread Yufan Sheng
Hi Neng,

Thanks for reminding me of the need to explain the lack of
functionality. I have checked the missing functions with the customer
and teammates. And we found that the only function we can't support
after removing the Shared and Key_Shared subscription support is
delayed message delivery
https://pulsar.apache.org/docs/2.10.x/concepts-messaging/#delayed-message-delivery.

Because it's based on the Shared subscription. If the end user needs
this, they may need to add the message in Flink's state and register
an event timer to achieve such ability on Flink. But I don't think we
should use delayed message delivery on Flink. In general, we always
depend on the watermark on Flink to handle the out of orderliness
messages. This is why we have delayed message delivery on Pulsar.

Best regards,

Yufan

On Fri, Dec 16, 2022 at 10:49 AM Neng Lu  wrote:
>
> Hi Yufan,
>
> In general, I think it's okay to remove these features.
>
> But could you elaborate If there will be missing functionality after we 
> remove these two subscriptions support?
>
>
> On 2022/12/14 13:01:53 盛宇帆 wrote:
> > Hi Zili,
> >
> > Thanks for picking up this discussion. Here is my answer:
> >
> > I agreed with your first question. If the problems are related to
> > Pulsar, it should be redelivered to the Pulsar repo. But these flaky
> > tests only occur on the Shared or Key_Shared subscription with the
> > transaction and I can’t reproduce it on my local machine. I don’t know
> > how to submit issues.
> >
> > The performance issue is due to the internal implementation of the
> > Pulsar transaction. Pulsar has to log the ack status in an individual
> > topic which makes the performance extremely slow for large throughput.
> >
> > The only reason I can recall when I started to support the Shared
> > subscription is that we can have more consumers on the same partition
> > to increase the processing speed. But Flink can increase the
> > performance by increasing the parallelism of the backend operators.
> > The bottle neck isn’t the consuming message from Pulsar with exclusive
> > subscription. This means that we don’t have to support the Shared
> > subscription for performance.
> >
> > The Key_Shared subscription is only used to distribute the messages by
> > its key hash for the different consumers in Pulsar which can be
> > achieved by using Flink’s keyBy(). If we want to consume a subset of
> > key hash. We have to use an Exclusive subscription with a key ranges.
> > This makes the support for Key_Shared meaningless.
> >
> > So I prefer to remove them to get a better support of Pulsar in Flink.
> >
> > Best,
> > Yufan
> >
> > On Wed, Dec 14, 2022 at 8:49 PM Zili Chen  wrote:
> > >
> > > Hi Yufan,
> > >
> > > Thanks for starting this discussion. My two coins:
> > >
> > > 1. It can help the upstream to fix the transaction issues by submitting 
> > > the instability and performance issues to the pulsar repo also.
> > > 2. Could you elaborate on whether and (if so) why we should drop the 
> > > Shared and Key_Share subscription support on Flink?
> > >
> > > Best,
> > > tison.
> > >
> > > On 2022/12/14 10:00:56 盛宇帆 wrote:
> > > > Hi, I'm the maintainer of flink-connector-pulsar. I would like to
> > > > start a survey on a function change proposal in
> > > > flink-connector-pulsar.
> > > >
> > > > I have created a ticket
> > > >  on JIRA and paste
> > > > its description here:
> > > >
> > > > A lot of Pulsar connector test unstable issues are related to Shared
> > > > and Key_Shared subscription. Because this two subscription is designed
> > > > to consume the records in an unordered way. And we can support
> > > > multiple consumers in same topic partition. But this feature lead to
> > > > some drawbacks in connector.
> > > >
> > > > 1. Performance
> > > >
> > > > Flink is a true stream processor with high correctness support. But
> > > > support multiple consumer will require higher correctness which
> > > > depends on Pulsar transaction. But the internal implementation of
> > > > Pulsar transaction on source is record the message one by one and
> > > > stores all the pending ack status in client side. Which is slow and
> > > > memory inefficient.
> > > >
> > > > This means that we can only use Shared and Key_Shared on Flink with
> > > > low throughput. This against our intention to support these two
> > > > subscription. Because adding multiple consumer to same partition can
> > > > increase the consuming speed.
> > > >
> > > > 2. Unstable
> > > >
> > > > Pulsar transaction acknowledge the messages one by one in an internal
> > > > Pulsar's topic. But it's not stable enough to get it works. A lot of
> > > > pending issues in Flink JIRA are related to Pulsar transaction and we
> > > > don't have any workaround.
> > > >
> > > > 3. Complex
> > > >
> > > > Support Shared and Key_Shared subscription make the connector's code
> > > > more complex than we expect. We have to make every part of code