Hi, Michael:

I thought about it carefully, and using 'startMessageId'
is indeed a good idea. But it is more complicated, we
need to ensure its absolute correctness, and take
performance into consideration. If you can come up
 with a closed-loop solution based on 'startMessageId',
I support you. If it can't take into account performance
and correctness, I think we will make a combination of
our two solutions. You are responsible for ensuring that
a certain degree of messages are not re-delivered, which
 reduces the overhead caused by the repeated delivery
of many messages. My design is responsible for
the final consistency.

Thanks,
Bo

Michael Marshall <mmarsh...@apache.org> 于2023年3月22日周三 14:22写道:
>
> Because we already send the `startMessageId`, there is a chance where
> we might not even need to update the protocol for the
> CommandSubscribe. In light of that, I quickly put together a PR
> showing how that field might be used to inform the broker where to
> start the read position for the cursor.
>
> https://github.com/apache/pulsar/pull/19892
>
> The PR is not complete, but it does convey the general idea. I wrote
> additional details in the draft's description.
>
> Thanks,
> Michael
>
> On Tue, Mar 21, 2023 at 11:31 PM Michael Marshall <mmarsh...@apache.org> 
> wrote:
> >
> > I am not following your objections to the protocol solution. It might
> > be more productive if I provided a draft PR with a sample
> > implementation. I'm not sure that I'll have time, but I'll try to put
> > something together this week.
> >
> > > At least it will simplify the process of using cumulative ack with the
> > > transaction.
> >
> > Is this the underlying motivation for the PIP?
> >
> > From my perspective, the PIP is seeking to decrease duplicate messages
> > experienced due to disconnections from the broker.
> >
> > > The problem of the resetting cursor can be optimized in the future
> >
> > Why should we push off solving this problem? It seems fundamental to
> > this PIP and should not be ignored. At the very least, I think we need
> > to have an idea of what the future solution would be before we defer
> > its implementation.
> >
> > Thanks,
> > Michael
> >
> >
> > On Tue, Mar 21, 2023 at 10:52 PM 丛搏 <congbobo...@gmail.com> wrote:
> > >
> > > Hi, Michael
> > > > In this case, the consumer does not have the source of truth for the
> > > > readPosition. It would leave the new protocol field for `readPosition`
> > > > empty and the broker would use its source of truth for the read
> > > > position.
> > > application has received all the messages by application thread. we also 
> > > need a
> > > correct `startPosition`, right? but in your way, we will think about
> > > the consumer
> > > hasn't received any messages.
> > >
> > > >
> > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in 
> > > > > the
> > > > > same logic? it's a bad code.
> > > >
> > > > We don't need to synchronize this code here because the logic will
> > > > come after the consumer has been disconnected from broker a and before
> > > > it is connected to broker b.
> > > The application takes a message from the queue then reconnect,
> > > the SubCommond can use the right startPostion? example:
> > > 1. application receives one message with `MessageId = 1`
> > > 2. consumer reconnect discovers the queue is empty, and the
> > > lastDequeMessageId doesn't change.
> > > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 
> > > 1`
> > > will redeliver from broker to client consumer, right?
> > >
> > > As we can see in the example, the application also can receive
> > > `MessageId = 1`, right?
> > > > We would not need to lock here because we do not enqueue new messages
> > > > after we've been disconnected from the broker and before we've sent
> > > > CommandSubscribe.
> > > we can see the code [0], the thread has changed.
> > > Where do we guarantee that no new messages will come in?
> > >
> > > >
> > > > Ultimately, I think a protocol solution will yield better results,
> > > > especially since we'll want to implement this feature in the other
> > > > client languages.
> > > The problem of the resetting cursor can be optimized in the future,
> > > but can you ensure the
> > > correctness of all the cases I mentioned above? IMO, if we use my
> > > design, client change,
> > > we don't need the broker to make any changes. its simple and it's easy
> > > to implement.
> > > I can make sure it's completely correct, I can make sure it's
> > > completely correct. In your design,
> > > I currently do not see a closed-loop implementation that can achieve
> > > at least in the java client.
> > >
> > > Thanks,
> > > Bo
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏 <congbobo...@gmail.com> wrote:
> > > > >
> > > > > Hi, Michael:
> > > > >
> > > > > Michael Marshall <mmarsh...@apache.org> 于2023年3月21日周二 23:17写道:
> > > > >
> > > > > >
> > > > > > One more point. Instead of keeping track of the latest message seen 
> > > > > > by
> > > > > > the application, the logic in my solution would actually just check
> > > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > > recently added), and use that as the read position in the subscribe
> > > > > > command. If we made this change, we would have to change this code 
> > > > > > [0]
> > > > > > to not drop the `incomingMessages` queue.
> > > > >
> > > > > case 1:
> > > > > What we define the message that the application has seen?
> > > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > > how do we get the correct `startPosition`?
> > > > > What I think we should lock the receive logic in [1]
> > > > > ```
> > > > > synchronized (this) {
> > > > >     message = incomingMessages.take();
> > > > >     messageProcessed(message);
> > > > > }
> > > > > ```
> > > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in 
> > > > > the
> > > > > same logic? it's a bad code.
> > > > >
> > > > > case 2:
> > > > > If we sub with `startMessageId`, we also should lock any enqueue
> > > > > logic, like [2] and
> > > > > check to consumer's current state
> > > > > ```
> > > > > synchronized (this) {
> > > > >     if (consumer.isConnected) {
> > > > >         if (canEnqueueMessage(message) && 
> > > > > incomingMessages.offer(message)) {
> > > > >             // After we have enqueued the messages on
> > > > > `incomingMessages` queue, we cannot touch the message
> > > > >             // instance anymore, since for pooled messages, this
> > > > > instance was possibly already been released
> > > > >             // and recycled.
> > > > >             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, 
> > > > > messageSize);
> > > > >             getMemoryLimitController().ifPresent(limiter ->
> > > > > limiter.forceReserveMemory(messageSize));
> > > > >             updateAutoScaleReceiverQueueHint();
> > > > >         }
> > > > >     }
> > > > > }
> > > > > ```
> > > > > case 3:
> > > > > when we subcommand sends to broker with `startMessageId = 1`, then the
> > > > > broker push message
> > > > > has not yet entered `incommingQueue`, the application invokes
> > > > > redeliver. in this way, we don't
> > > > > filter messages are correct, right?
> > > > >
> > > > > These are some cases that I simply thought of, and there must be
> > > > > others that I haven't thought
> > > > > of. Are you sure we can handle these problems correctly?
> > > > >
> > > > > > The problem of "the consumer doesn't know" seems like something that
> > > > > > is reasonably within the protocol's responsibilities. In this case, 
> > > > > > an
> > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > >
> > > > > I don't think a simple change protocol can solve these problems,
> > > > > We can't promise that every consumer can receive the broker reset
> > > > > cursor request.
> > > > > When the consumer reconnects, the broker can't send the reset cursor 
> > > > > request to
> > > > > the client consumers, right? In this case, the consumer is still 
> > > > > unaware, right?
> > > > >
> > > > >
> > > > > [0] 
> > > > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > > > > [1] 
> > > > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > > > > [2] 
> > > > > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > > > > >
> > > > > > Thanks,
> > > > > > Michael
> > > > > >
> > > > > > [0] 
> > > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > >
> > > > > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall 
> > > > > > <mmarsh...@apache.org> wrote:
> > > > > > >
> > > > > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > > > > the synchronization between consumer reconnection and user
> > > > > > > > calling receive and redeliverUnack method. it will affect the 
> > > > > > > > performance
> > > > > > > > of receive. expose synchronization to hot paths it not a good 
> > > > > > > > idea.
> > > > > > >
> > > > > > > I don't think this is a valid objection. I am pretty sure we 
> > > > > > > already
> > > > > > > synchronize in the relevant places in the consumer to solve the 
> > > > > > > exact
> > > > > > > race condition you're concerned about: [0] [1].
> > > > > > >
> > > > > > > My proposed operation is to keep track of the latest message id 
> > > > > > > that
> > > > > > > the application has seen, and then tell the broker that id when
> > > > > > > sending the Subscribe command. We already do similar logic here 
> > > > > > > [2]
> > > > > > > [3], but instead of getting the first message id the consumer 
> > > > > > > hasn't
> > > > > > > seen, we'll get the latest message id seen.
> > > > > > >
> > > > > > > Regarding performance, the PIP doesn't touch on how it will 
> > > > > > > filter out
> > > > > > > messages. What is the planned approach? In my understanding, the
> > > > > > > client will keep track of the latest message id that the 
> > > > > > > application
> > > > > > > has seen and then will need to compare that message id against 
> > > > > > > every
> > > > > > > new mess. As such, it seems like telling the broker where to start
> > > > > > > instead of naively checking a filter on every message would be
> > > > > > > cheaper.
> > > > > > >
> > > > > > > > As described in Compatibility in PIP. Client consumer doesn't 
> > > > > > > > know
> > > > > > > > Pulsar Admin reset cursor.
> > > > > > >
> > > > > > > The problem of "the consumer doesn't know" seems like something 
> > > > > > > that
> > > > > > > is reasonably within the protocol's responsibilities. In this 
> > > > > > > case, an
> > > > > > > event happens on the broker, and the broker can tell the consumer.
> > > > > > >
> > > > > > > > * <p>Consumers should close when the server resets the cursor,
> > > > > > > > * when the cursor reset success, and then restart. Otherwise,
> > > > > > > > * the consumer will not receive the history messages.
> > > > > > >
> > > > > > > This is introducing a confusing edge case that requires reading a
> > > > > > > Javadoc in order to understand. That seems risky to me, and I do 
> > > > > > > not
> > > > > > > think we should add such an edge case. A new protocol message 
> > > > > > > would
> > > > > > > easily handle it and make it transparent to the application.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Michael
> > > > > > >
> > > > > > > [0] 
> > > > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > > > > > [1] 
> > > > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > > > > > [2] 
> > > > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > > > > > [3] 
> > > > > > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > > > > > >
> > > > > > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > > > > > <yubiao.f...@streamnative.io.invalid> wrote:
> > > > > > > >
> > > > > > > > +1
> > > > > > > >
> > > > > > > > Hi, Bo :
> > > > > > > >
> > > > > > > > Thanks for your explanation. That makes sense to me.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Yubiao Feng
> > > > > > > >
> > > > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <congbobo...@gmail.com> 
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi, pulsar community:
> > > > > > > > >
> > > > > > > > > I started a PIP about `Client consumer filter received 
> > > > > > > > > messages`.
> > > > > > > > >
> > > > > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Bo
> > > > > > > > >

Reply via email to