To me this discussion presupposes that a streaming system should provide
a service like a database. Before we   discuss about how to implement this,
we should look at whether this is something that fits into what is the core
of Pulsar. I still have the same concerns against doing this in the broker
dispatch side.

What exactly is the delayed delivery use case?  Random insertion, dynamic
sorting,  and deletion from the top of the sort.  That is a priority queue.
It is best implemented as a heap. For larger sets it's some sort of tree
structure. You can simulate that on a database with an index.

Random insertion and deletion is not what FIFO queues like Pulsar are
designed for.  The closest thing I can think of with Pulsar is to build an
in-mem priority queue in a Pulsar function, feed it from an input topic and
publish the top of the queue into a separate output topic.  In fact the
entire logic proposed in PIP-26 can be done outside the broker in a Pulsar
function.

For a small scale setup, these distinctions do not matter - you can use a
database as a queue and a queue as a database. But at any larger scale, a
streaming system is not the correct solution for a priority queue use case,
whether it's Pulsar or some other streaming system. So far I have not seen
any mention of the target scale for the design, or the specific use case
requirements

-joe


On Sat, Jan 19, 2019 at 6:43 PM PengHui Li <codelipeng...@gmail.com> wrote:

> Hi All,
>
> Actually, I also prefer to simplify at broker side.
>
> If pulsar support set arbitrary timeout on each message, if not cluster
> failure or consumer failure,
> it needs to behave normally(on time). Otherwise, user need to understand
> how pulsar dispatching
> messages and how limit of unacked messages change the delay message
> behavior. This may
> lead users to hesitate, this feature may be misused when the user does not
> fully understand how it works.
>
> When user depends arbitrary timeout message feature, users just need to
> keep producer and consumer
> is work well, and administrator of pulsar need to keep pulsar cluster work
> well.
>
> I don't think pulsar is very necessary to support this feature(arbitrary
> timeout message),
> In most scenarios, #3155 can work well, In a few cases, even if support
> arbitrary timeout message in
> client side, i believe that still can not meet the requirement of all
> delayed messages.
>
> To me, i’m not against support arbitrary timeout on each message on client
> side, maybe this is useful
> for other users. In some of our scenarios, we also need a more functional
> alternative(a task service).
>
> Of course, If we can integrate a task service, we can use pulsar to
> guaranteed delivery of messages,
> task service guaranteed send message to pulsar success. Or pulsar broker
> support filter server.
> This way users can implement their own task services.
>
> Ezequiel Lovelle <ezequiellove...@gmail.com> 于2019年1月20日周日 上午12:28写道:
>
> > > If the goal is to minimize the amount of redeliveries from broker ->
> > client, there are multiple ways to achieve that with the client based
> > approach
> > (eg. send message id and delay time instead of the full payload to
> > consumers
> > as Ivan proposed).
> >
> > But the main reason to put this logic on client side was not adding delay
> > related logic on broker side, in order to do this optimisations the
> broker
> > must be aware of delayed message and only send message id and delay time
> > without payload.
> >
> > > I don't necessarily agree with that. NTP is widely available
> > and understood. Any application that's doing anything time-related would
> > have
> > to make sure the clocks are reasonably synced.
> >
> > Yep, that's true, but from my point of view a system that depends on
> client
> > side clock is weaker than a system that does this kind of calculation at
> > a more controlled environment aka backend. This adds one more factor that
> > depends on the user doing things right, which is not always the case.
> >
> > One possible solution might be the broker send periodically its current
> > epoch time and the client do the calculations with this data, or send
> epoch
> > time initially at subscription and do the rest of calculations doing
> delta
> > of
> > time using the initial time from broker as a base (time flows equally for
> > both
> > the important thing is which one is positioned at the very present time).
> >
> > Anyway this mentioned approach sound like an a hack just from the fact of
> > not doing the time calculations in the backend.
> >
> > > Lastly, i do agree client side approaches have better scalability than
> > server side approaches in most cases. However I don’t believe that it is
> > the case here. And I don’t see anyone have a clear explanation on why a
> > broker approach is less scalable than the client side approach.
> >
> > Yes, I agree with this. At least for fixed time delay at pr #3155.
> >
> > The only remained concern to me would be Gc usage of stored positions
> > next to be expired, anyway, since the nature of a fixed delay and
> > from the fact that process a ledger tend to be in a sequentially manner,
> > we could store a range of positions id for some delta when intensive
> > traffic is going on, I believe I did this mention on the pr.
> >
> > > Again, in general I'm more concerned of stuff that happens in broker
> > because
> > it will have to be scaled up 10s of thousands of times in a single
> > process, while in client typically the requirements are much simpler.
> >
> > I agree that adding logic to broker should be considered with deep care,
> > but in this specific scenario at worst case we will only have one and
> only
> > one scheduled task per consumer which will take all expired positions
> > from a DelayQueue.
> >
> > --
> > *Ezequiel Lovelle*
> >
> >
> > On Sat, 19 Jan 2019 at 01:02, Matteo Merli <matteo.me...@gmail.com>
> wrote:
> >
> > > Just a quick correction:
> > >
> > > > And I don’t see anyone have a clear explanation on why a
> > > broker approach is less scalable than the client side approach.
> > >
> > > I haven't said it less or more scalable. I was meaning that it's
> > > "easier" to scale, in that we don't have to do lots of fancy stuff
> > > and add more and more control to make sure that the implementation
> > > will not become a concern point at scale (eg: limit the overall
> > > amount of memory used in broker, across all topics, and the
> > > impact on GC of these long-living objects).
> > >
> > > > However, clock skew in a brokerside approach
> > > is easier to manage and more predictable, but clock skew in a
> clientside
> > > approach is much harder to manage and more unpredictable
> > >
> > > I don't necessarily agree with that. NTP is widely available
> > > and understood.
> > > Any application that's doing anything time-related would have
> > > to make sure the clocks are reasonably synced.
> > >
> > > --
> > > Matteo Merli
> > > <matteo.me...@gmail.com>
> > >
> > > On Fri, Jan 18, 2019 at 7:46 PM Sijie Guo <guosi...@gmail.com> wrote:
> > > >
> > > > On Sat, Jan 19, 2019 at 9:45 AM Matteo Merli <matteo.me...@gmail.com
> >
> > > wrote:
> > > >
> > > > > Trying to group and compress responses here.
> > > > >
> > > > > > If consumer control the delayed message specific execution time
> we
> > > must
> > > > > trust clock of consumer, this can cause delayed message process
> ahead
> > > of
> > > > > time, some applications cannot tolerate this condition.
> > > > >
> > > > > This is a problem that cannot be solved.
> > > > > Even assuming the timestamps are assigned by brokers and are
> > guaranteed
> > > > > to be monotonic, this won't prevent 2 brokers from having clock
> > skews.
> > > > > That would results in different delivery delays.
> > > > >
> > > > > Similarly, the broker timestamp might be assigned later compared to
> > > when a
> > > > > publisher was "intending" to start the clock.
> > > > >
> > > > > Barring super-precise clock synchronization techniques (which are
> way
> > > out
> > > > > of the scope of this discussion), the only reasonable way to think
> > > about
> > > > > this is
> > > > > that delays needs to be orders of magnitudes bigger than the
> average
> > > clock
> > > > > skew experienced with common techniques (eg: NTP). NTP clock skew
> > will
> > > > > generally be in the 10s of millis. Any delay > 1 seconds will
> hardly
> > be
> > > > > noticeably affected by these skews.
> > > > >
> > > > > Additionally, any optimization on the timeouts handling (like the
> > > > > hash-wheel
> > > > > timer proposed in PIP-26) will trade off precision for efficiency.
> In
> > > that
> > > > > case,
> > > > > the delays are managed in buckets, and can result in higher delays
> > that
> > > > > what was requested.
> > > > >
> > > > > > 1. Fixed timeout, e.g.(with 10s, 30s, 10min delayed), this is the
> > > largest
> > > > > proportion in throughput of delayed message . A subscription with a
> > > fixed
> > > > > delayed time can approach to this scene.
> > > > >
> > > > > I don't think that for fixed delays, any server-side implementation
> > > > > would provide
> > > > > any advantage compared to doing:
> > > > >
> > > > > ```
> > > > > while (true) {
> > > > >     Message msg = consumer.receive();
> > > > >     long delayMillis = calculateDelay(msg)
> > > > >     if (delayMillis > 0) {
> > > > >         Thread.sleep(delayMillis);
> > > > >     }
> > > > >
> > > > >     // Do something
> > > > >     consumer.acknowledge(msg);
> > > > > }
> > > > > ```
> > > > >
> > > > > This will not need any support from broker. Also, there will be no
> > > > > redeliveries.
> > > > >
> > > > > It could be wrapped in the client API, although I don't see that as
> > > > > big of a problem.
> > > > >
> > > > > > My concern of this category of approaches is "bandwidth" usage.
> It
> > is
> > > > > basically trading bandwidth for complexity.
> > > > >
> > > > > With mixed delays on a single topic, in any case there has to be
> some
> > > kind
> > > > > of time-based sorting of the messages that needs to happen either
> at
> > > broker
> > > > > or at client.
> > > > >
> > > > > Functionally, I believe that either place is equivalent (from a
> user
> > > > > point of view),
> > > > > barring the different implementation requirements.
> > > > >
> > > > > In my view, the bigger cost here is not bandwidth but rather the
> disk
> > > > > IO, that will
> > > > > happen exactly in the same way in both cases. Messages can be
> cached,
> > > > > up to a certain point, either in broker or in client library. After
> > > > > that, in both cases,
> > > > > the messages will have to be fetched from bookies.
> > > > >
> > > > > Also, when implementing the delay feature in the client, the
> existing
> > > > > flow control
> > > > > mechanism is naturally applied to limit the overall amount of
> > > information
> > > > > that
> > > > > we have to keep track (the "currently tracked" messages). Some
> other
> > > > > mechanism
> > > > > would have to be done in the broker as well.
> > > > >
> > > > > Again, in general I'm more concerned of stuff that happens in
> broker
> > > > > because
> > > > > it will have to be scaled up 10s of thousands of times in a single
> > > > > process, while
> > > > > in client typically the requirements are much simpler.
> > > > >
> > > > > If the goal is to minimize the amount of redeliveries from broker
> ->
> > > > > client, there
> > > > > are multiple ways to achieve that with the client based approach
> (eg.
> > > send
> > > > > message id and delay time instead of the full payload to consumers
> as
> > > Ivan
> > > > > proposed).
> > > > >
> > > > > This seems to be simpler and with less overhead than having to
> > persist
> > > > > the whole
> > > > > hashweel timer state into a ledger.
> > > >
> > > >
> > > > I agree with that there are many optimizations can be applied at a
> > client
> > > > side approach. In a stable world, these approaches are technically
> > > > equivalent.
> > > >
> > > > However I do not agree with a few points:
> > > >
> > > > First, based on my past production experiences, network bandwidth on
> > > broker
> > > > is the bigger cost than io cost in a multi subscription case. Also, I
> > > have
> > > > heard a few production users have experienced latency issues where
> > broker
> > > > network bandwidth is saturated. So any mechanisms that rely on
> > > redeliveries
> > > > are a big red flag to me.
> > > >
> > > > Secondly, currently pulsar is using more bandwidth on brokers, than
> > > > bandwidth on bookies. It is not a balanced state. I am more leaning
> > > towards
> > > > an approach that can leverage bookies’ idle bandwidth, rather than
> > > > potentially using more bandwidth on brokers.
> > > >
> > > > Thirdly, in my view, clock skew concern is not a technical issue,
> but a
> > > > management issue. As what Ivan and you have pointed out, there are
> many
> > > > ways on addressing clock skew. However, clock skew in a brokerside
> > > approach
> > > > is easier to manage and more predictable, but clock skew in a
> > clientside
> > > > approach is much harder to manage and more unpredictable. This
> > > > unpredictability can significantly change the io or network pattern
> > when
> > > > things go bad. When such unpredictability happens, it can cause bad
> > > things
> > > > and saturating broker network in a redeliver-ish approach. If we are
> > > > building a distributed system that can handle this unpredictability,
> a
> > > > broker-side approach is much more friendly to managebility and
> incident
> > > > management.
> > > >
> > > > Lastly, i do agree client side approaches have better scalability
> than
> > > > server side approaches in most cases. However I don’t believe that it
> > is
> > > > the case here. And I don’t see anyone have a clear explanation on
> why a
> > > > broker approach is less scalable than the client side approach.
> > > >
> > > > Anyway, for managebility, bandwidth usage, client simplicity, I am
> more
> > > in
> > > > favor of a broker side approach, or at least an approach that is not
> > > > redelivery based. However since the feature is requested by Penghui
> > > > and Ezequiel,
> > > > I am also fine with this client side approach if they are okay with
> > that.
> > > >
> > > > - Sijie
> > > >
> > > >
> > > >
> > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Matteo Merli
> > > > > <matteo.me...@gmail.com>
> > > > >
> > > > >
> > > > > On Fri, Jan 18, 2019 at 6:35 AM Ezequiel Lovelle
> > > > > <ezequiellove...@gmail.com> wrote:
> > > > > >
> > > > > > Hi All! and sorry for delay :)
> > > > > >
> > > > > > Probably I'm going to say some things already said, so sorry
> > > beforehand.
> > > > > >
> > > > > > The two main needed features I think are the proposed:
> > > > > > A. Producer delay PIP-26. B. Consumers delay PR #3155
> > > > > >
> > > > > > Of course PIP-26 would result in consumers receiving delayed
> > messages
> > > > > > but the important thing here is one of them made the decision
> about
> > > > > delay.
> > > > > >
> > > > > > First, the easy one, PR #3155. Consumers delay:
> > > > > >
> > > > > > As others have stated before, this is a more trivial approach
> > because
> > > > > > of the nature of having the exactly same period of delay for each
> > > message
> > > > > > which is predictable.
> > > > > >
> > > > > > I agree that adding logic at broker should be avoided, but, for
> > this
> > > > > > specific feature #3155 which I don't think is complex I believe
> > there
> > > > > > are others serious advantages:
> > > > > >
> > > > > >  1. Simplicity at client side, we don't need to add any code
> which
> > is
> > > > > >     less error prone.
> > > > > >  2. Clock issues from client side being outdated and causing
> > headache
> > > > > >     to users detecting this.
> > > > > >  3. Avoids huge overhead delivering non expired messages across
> the
> > > > > >     network unnecessary.
> > > > > >  4. Consumers are free to decide to consume messages with delay
> > > > > regardless
> > > > > >     of the producer.
> > > > > >  5. Delay is uniform for all messages, which sometimes is the
> > > solution
> > > > > >     to the problem rather than arbitrary delays.
> > > > > >
> > > > > > I think that would be great if pulsar can provide this kind of
> > > features
> > > > > > without relaying on users needing to know heavy details about the
> > > > > > mechanism.
> > > > > >
> > > > > > For PIP-26:
> > > > > >
> > > > > > I think we can offer this with the purpose of message's with a
> more
> > > long
> > > > > > delay in terms of time? hours / days?
> > > > > >
> > > > > > So, if this is the case, we can assume a small granularity of
> time
> > > like
> > > > > > 1 minute making ledger's representing 1 minute of time and
> > truncating
> > > > > > each time of message for it corresponding minute and storing in
> > that
> > > > > > special ledger.
> > > > > > Users wanting to receive a messages scheduled for some days in
> > future
> > > > > > rarely would care of a margin of error of 1 minute.
> > > > > >
> > > > > > Of course we need somehow make the broker aware of this in order
> to
> > > only
> > > > > > process ledger's for current corresponding minute and consume it.
> > > > > > And the broker would be the one subject to close current minute
> > > truncated
> > > > > > processed ledger.
> > > > > >
> > > > > > One problem I can think about this approach, is it painful for
> > > Bookkeeper
> > > > > > to having a lot of opened ledgers? (one for each minute per
> topic)
> > > > > >
> > > > > > Another problem here might be what happen if consumer was not
> > > started?
> > > > > > At startup time the broker should looking for potentially older
> > > ledger's
> > > > > > than its current time and this might be expensive.
> > > > > >
> > > > > > Other more trivial issue, we might need to refactor current
> > mechanism
> > > > > > which deletes closed ledgers older than the configured time on
> name
> > > > > space.
> > > > > >
> > > > > > As a final note I think that would be great to have both features
> > in
> > > > > pulsar
> > > > > > but sometimes not everything desired is achievable.
> > > > > > And please correct me if I said something senseless.
> > > > > >
> > > > > > --
> > > > > > *Ezequiel Lovelle*
> > > > > >
> > > > > >
> > > > > > On Fri, 18 Jan 2019 at 05:51, PengHui Li <
> codelipeng...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > > So rather than specifying the absolute timestamp that the
> > message
> > > > > > > > should appear to the user, the dispatcher can specify the
> > > relative
> > > > > > > > delay after dispatch that it should appear to the user.
> > > > > > >
> > > > > > > As matteo said the worst case would be that the applied delay
> to
> > be
> > > > > higher
> > > > > > > for some of the messages, if specify the relative delay to
> > > consumer,
> > > > > > > if consumer offline for a period of time, consumer will receive
> > > many
> > > > > > > delayed messages
> > > > > > > after connect to broker again will cause the worst case more
> > > serious.
> > > > > It's
> > > > > > > difficult to keep
> > > > > > > consumers always online.
> > > > > > >
> > > > > > > In my personal perspective, i refer to use `delay level topic`
> to
> > > > > approach
> > > > > > > smaller delays scene.
> > > > > > > e.g(10s-topic, 30s-topic), this will not be too much topic. And
> > we
> > > are
> > > > > > > using dead letter topic to simulate
> > > > > > > delay message feature, delayed topics has different delay
> level.
> > > > > > >
> > > > > > > For very long delays scene, in our practice, user may cancel it
> > or
> > > > > restart
> > > > > > > it.
> > > > > > > After previous discussions, i agree that PIP-26 will make
> broker
> > > > > > > more complexity.
> > > > > > > So I had the idea to consider as a separate mechanism.
> > > > > > >
> > > > > > >
> > > > > > > Sijie Guo <guosi...@gmail.com> 于2019年1月18日周五 下午3:22写道:
> > > > > > >
> > > > > > > > On Fri, Jan 18, 2019 at 2:51 PM Ivan Kelly <iv...@apache.org
> >
> > > wrote:
> > > > > > > >
> > > > > > > > > One thing missing from this discussion is details on the
> > > motivating
> > > > > > > > > use-case. How many delayed messages per second are we
> > > expecting?
> > > > > And
> > > > > > > > > what is the payload size?
> > > > > > > > >
> > > > > > > > > > If consumer control the delayed message specific
> execution
> > > time
> > > > > we
> > > > > > > must
> > > > > > > > > > trust clock of consumer, this can cause delayed message
> > > process
> > > > > ahead
> > > > > > > > of
> > > > > > > > > > time, some applications cannot tolerate this condition.
> > > > > > > > >
> > > > > > > > > This can be handled in a number of ways. Consumer clocks
> can
> > be
> > > > > skewed
> > > > > > > > > with regard to other clocks, but it is generally safe to
> > assume
> > > > > that
> > > > > > > > > clocks advance at the same rate, especially at the
> > granularity
> > > of a
> > > > > > > > > couple of hours.
> > > > > > > > > So rather than specifying the absolute timestamp that the
> > > message
> > > > > > > > > should appear to the user, the dispatcher can specify the
> > > relative
> > > > > > > > > delay after dispatch that it should appear to the user.
> > > > > > > > >
> > > > > > > > > > > My concern of this category of approaches is
> "bandwidth"
> > > > > usage. It
> > > > > > > is
> > > > > > > > > > > basically trading bandwidth for complexity.
> > > > > > > > > >
> > > > > > > > > > @Sijie Guo <si...@apache.org> Agree with you, such an
> > > trading
> > > > > can
> > > > > > > > cause
> > > > > > > > > the
> > > > > > > > > > broker's out going network to be more serious.
> > > > > > > > >
> > > > > > > > > I don't think PIP-26's approach may not use less bandwidth
> in
> > > this
> > > > > > > > > regard. With PIP-26, the msg ids are stored in a ledger,
> and
> > > when
> > > > > the
> > > > > > > > > timeout triggers it dispatches? Are all the delayed message
> > > being
> > > > > > > > > cached at the broker? If so, that is using a lot of memory,
> > and
> > > > > it's
> > > > > > > > > exactly the kind of memory usage pattern that is very bad
> for
> > > JVM
> > > > > > > > > garbage collection. If not, then you have to read the
> message
> > > back
> > > > > in
> > > > > > > > > from bookkeeper, so the bandwidth usage is the same, though
> > on
> > > a
> > > > > > > > > different path.
> > > > > > > > >
> > > > > > > > > In the client side approach, the message could be cached to
> > > avoid a
> > > > > > > > > redispatch. When I was discussing with Matteo, we discussed
> > > this.
> > > > > The
> > > > > > > > > redelivery logic has to be there in any case, as any cache
> > > (broker
> > > > > or
> > > > > > > > > client side) must have a limited size.
> > > > > > > > > Another option would be to skip sending the payload for
> > delayed
> > > > > > > > > messages, and only send it when the client request
> > redelivery,
> > > but
> > > > > > > > > this has the same issue with regard to the entry likely
> > > falling out
> > > > > > > > > the cache at the broker-side.
> > > > > > > >
> > > > > > > >
> > > > > > > > There are bandwidth usage at either approaches for sure. The
> > main
> > > > > > > > difference between broker-side and client-side approaches is
> > > which
> > > > > part
> > > > > > > of
> > > > > > > > the bandwidth is used.
> > > > > > > >
> > > > > > > > In the broker-side approach, it is using the bookies egress
> and
> > > > > broker
> > > > > > > > ingress bandwidth. In a typical pulsar deployment, bookies
> > > egress is
> > > > > > > mostly
> > > > > > > > idle unless there are consumers falling behind.
> > > > > > > >
> > > > > > > > In the client-side approach, it is using broker’s egress
> > > bandwidth
> > > > > and
> > > > > > > > potentially bookies’ egress bandwidth. Brokers’ egress is
> > > critical
> > > > > since
> > > > > > > it
> > > > > > > > is shared across consumers. So if the broker egress is
> doubled,
> > > it
> > > > > is a
> > > > > > > red
> > > > > > > > flag.
> > > > > > > >
> > > > > > > > Although I agree the bandwidth usage depends on workloads.
> But
> > in
> > > > > theory,
> > > > > > > > broker-side approach is more friendly to resource usage and a
> > > better
> > > > > > > > approach to use the resources in a multi layered
> architecture.
> > > > > Because it
> > > > > > > > uses less bandwidth at broker side. A client side can cause
> > more
> > > > > > > bandwidth
> > > > > > > > usage at broker side.
> > > > > > > >
> > > > > > > > Also as what penghui pointed out, clock screw can be another
> > > factor
> > > > > > > causing
> > > > > > > > more traffic in a fanout case. In a broker-side approach, the
> > > > > deferred is
> > > > > > > > done in a central point, so when the deferred time point
> kicks
> > > in,
> > > > > broker
> > > > > > > > just need to read the data one time from bookies. However in
> a
> > > > > > > client-side
> > > > > > > > approach, the messages are asked by different subscriptions,
> > > > > different
> > > > > > > > subscription can ask the deferred message at any time based
> on
> > > their
> > > > > > > > clocks.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > > > -Ivan
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > >
> >
>

Reply via email to