Hi Haiting,

> I think this approach have more potential with abnormal topic isolation.
If we can introduce
some kind of bundle isolation strategy, (like broker-bundle affinity and
anti-affinity mechanism), we can easily isolate some unexpected traffic to
some empty brokers.
IMO, this would improve the stability of broker cluster.

if I understand correctly, it looks like if we have a partitioned topic
with 10
partitions under a namespace with 16 bundles, if applies the anti-affinity
policy,
partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
Of course, it is not necessary for every partitioned topic to start from
bundle 0,
we can use the partition-0 hash to determine the start bundle index.

Do you have an example for affinity? I don't fully understand how this is
used
in practice.

Best,
Penghui

On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <peng...@apache.org> wrote:

> Hi Aloys,
>
> >  Do you mean that
> 1. First, add a new API, maybe `getHashPositioin`,  to get the hash
> position in a bundle
> 2. Then use this position to split the overloaded bundle
> If so, when we split a bundle with multi partitions of a topic, we need to
> call the `getHashPositioin` multi times to get the middle position of all
> these positions.
>
> Yes, this want I mean. In this way, users can control to assign 1 topic or
> 3 topics to one bundle. This is more like increasing the transparency of
> the topic in the bundle, you can all the positions of the topics, so how
> planning for bundle splitting becomes more flexible.
>
> The new API does not necessarily have to query by topic one by one,
> we have listed all the "topic -> position" of a bundle?
>
> Thanks,
> Penghui
>
> On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <jianghait...@apache.org>
> wrote:
>
>> Hi Aloys,
>> +1 for this great PIP.
>>
>> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
>> ${bundle_range}`
>> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
>> name.
>>
>> Do we have limitation on this "topic" parameter. Can this be a
>> partitioned topic?
>> If so, will this new algorithm split the bundle into more than 2 bundles?
>> like each bundle for
>> one partition.
>>
>> > This algorithm has a disadvantage, it can only deal
>> > with one `outstanding topic`.
>>
>> For this disadvantage, I think it can be solved by extends the "topic"
>> parameter from one topic to a topic list.
>>
>> > The other algorithm is to split the bundle at the hashcode point of the
>> > `outstanding partition` which will split the bundle into three bundles
>> once
>> > a time. The middle one contains the only point the hashcode of the
>> > `outstanding partition, the left one is less than the hashcode, the
>> right
>> > one is more than the hashcode.
>> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
>> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
>> > algorithm  is going to split bundle the bundle into five new bundles,
>> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
>> > partition-y), 0x08_0x10.
>>
>> I think this approach have more potential with abnormal topic isolation.
>> If we can introduce
>> some kind of bundle isolation strategy, (like broker-bundle affinity and
>> anti-affinity mechanism), we can easily isolate some unexpected traffic to
>> some empty brokers.
>> IMO, this would improve the stability of broker cluster.
>>
>> Thanks,
>> Haiting
>>
>> On 2022/02/17 15:47:15 Aloys Zhang wrote:
>> > Hi Pulsar Community,
>> >
>> > This is a PIP discussion on how to support split partitions belonging to
>> > specified topics in a bundle.
>> >
>> > The issue can be found: https://github.com/apache/pulsar/issues/13761
>> >
>> > I copy the content here for convenience, any suggestions are welcome and
>> > appreciated.
>> >
>> >
>> > ## Motivation
>> >
>> > As we all know, a namespace bundle may contain lots of partitions
>> belonging
>> > to different topics.
>> > The throughput of these topics may vary greatly. Some topics may with
>> very
>> > high rate/throughput while other topics have a very low rate/throughput.
>> >
>> > These partitions with high rate/throughput can cause broker overload and
>> > bundle unloading.
>> > At this point, if we split bundle manually with `range_equally_divide`
>> or
>> > `topic_count_equally_divide` split algorithm, there may need many times
>> > split before these high rate/through partitions assigned to different
>> new
>> > bundles.
>> >
>> > For convenience, we call these high throughput topics `outstanding
>> topic`
>> > and their partitions `outstanding partition` in this PIP.
>> >
>> > ## Goal
>> >
>> > Our goal is to make it easier to split `outstanding partition` into new
>> > bundles.
>> >
>> > There are two alternative ways to achieve this. Either of them will add
>> a
>> > new algorithm for bundle split. The difference is how the new bundle
>> split
>> > algorithm is implemented.
>> >
>> > One algorithm is to split bundle by `outstanding topic` which will split
>> > the bundle into two new bundles and each new bundle contains an equally
>> > `outstanding partition` once a time.
>> > E.g, a bundle contains lots of topic partitions, and only one
>> `outstanding
>> > topic`(T) with 2  `outstanding partition` (T-partition-n,
>> Tpartition-n+1).
>> > This algorithm split this bundle at the middle point of these two
>> > partition's hashcode.  This algorithm has a disadvantage, it can only
>> deal
>> > with one `outstanding topic`.
>> >
>> > So we raised up another algorithm.
>> >
>> > The other algorithm is to split the bundle at the hashcode point of the
>> > `outstanding partition` which will split the bundle into three bundles
>> once
>> > a time. The middle one contains the only point the hashcode of the
>> > `outstanding partition, the left one is less than the hashcode, the
>> right
>> > one is more than the hashcode.
>> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
>> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
>> > algorithm  is going to split bundle the bundle into five new bundles,
>> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
>> > partition-y), 0x08_0x10.
>> >
>> > ## API Changes
>> >
>> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
>> ${bundle_range}`
>> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
>> name.
>> >
>> > The split interface changed from
>> >
>> > ```JAVA
>> > void splitNamespaceBundle(String namespace, String bundle, boolean
>> > unloadSplitBundles, String splitAlgorithmName)throws
>> PulsarAdminException;
>> > ```
>> >
>> > to
>> >
>> > ```java
>> > void splitNamespaceBundle(String namespace, String bundle, boolean
>> > unloadSplitBundles,
>> >                               String splitAlgorithmName, String topic)
>> > throws PulsarAdminException;
>> > ```
>> >
>> > ## Implementation
>> >
>> > There are changes both from the Admin CLI and the broker side.
>> >
>> > First, Admin CLI for split bundle should support to specify the
>> > `outstanding topic`,
>> >
>> > ```java
>> > /**
>> >      * Split namespace bundle.
>> >      *
>> >      * @param namespace
>> >      * @param bundle range of bundle to split
>> >      * @param unloadSplitBundles
>> >      * @param splitAlgorithmName
>> >      * @param topic
>> >      * @throws PulsarAdminException
>> >      */
>> >     void splitNamespaceBundle(String namespace, String bundle, boolean
>> > unloadSplitBundles,
>> >                               String splitAlgorithmName, String topic)
>> > throws PulsarAdminException;
>> >
>> > ```
>> >
>> > ```java
>> > /**
>> >      * Split namespace bundle asynchronously.
>> >      *
>> >      * @param namespace
>> >      * @param bundle range of bundle to split
>> >      * @param unloadSplitBundles
>> >      * @param splitAlgorithmName
>> >      */
>> >     CompletableFuture<Void> splitNamespaceBundleAsync(
>> >             String namespace, String bundle, boolean unloadSplitBundles,
>> > String splitAlgorithmName, String topic);
>> > ```
>> >
>> > And for the broker side, first encapsulates the parameters for bundle
>> split
>> > into a new class `BundleSplitOption`
>> >
>> > ```java
>> > public class BundleSplitOption {
>> >     private NamespaceService service;
>> >     private NamespaceBundle bundle;
>> >     private String topic;
>> > }
>> > ```
>> >
>> > add a new split algorithm
>> >
>> > ```java
>> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
>> implements
>> > NamespaceBundleSplitAlgorithm {
>> >     @Override
>> >     public CompletableFuture<List<Long>>
>> getSplitBoundary(BundleSplitOption
>> > bundleSplitOption) {
>> >
>> >         });
>> >     }
>> > }
>> > ```
>> >
>> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
>> >
>> > ```JAVA
>> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
>> > "specified_topic_count_equally_divide";
>> >
>> > List<String> AVAILABLE_ALGORITHMS =
>> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
>> >             TOPIC_COUNT_EQUALLY_DIVIDE,
>> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
>> >
>> >  NamespaceBundleSplitAlgorithm
>> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
>> >             new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
>> > ```
>> >
>> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for
>> >  [[NamespaceService.java](
>> >
>> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
>> )
>> >
>> >
>> > ```java
>> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
>> > boolean unload,
>> >
>> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
>> >
>> >         final CompletableFuture<Void> unloadFuture = new
>> > CompletableFuture<>();
>> >         final AtomicInteger counter = new
>> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
>> >         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
>> > unloadFuture, splitAlgorithm, topic);
>> >
>> >         return unloadFuture;
>> >     }
>> > ```
>> >
>> > ```java
>> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
>> >                                        boolean unload,
>> >                                        AtomicInteger counter,
>> >                                        CompletableFuture<Void>
>> > completionFuture,
>> >                                        NamespaceBundleSplitAlgorithm
>> > splitAlgorithm,
>> >                                        String topic) {
>> > ```
>> >
>> > Also, we change the REST api and broker.conf
>> >
>> > ```java
>> > public void splitNamespaceBundle(
>> >             @Suspended final AsyncResponse asyncResponse,
>> >             @PathParam("property") String property,
>> >             @PathParam("cluster") String cluster,
>> >             @PathParam("namespace") String namespace,
>> >             @PathParam("bundle") String bundleRange,
>> >             @QueryParam("authoritative") @DefaultValue("false") boolean
>> > authoritative,
>> >             @QueryParam("unload") @DefaultValue("false") boolean unload,
>> >             @QueryParam("topic") @DefaultValue("") String topic) {}
>> > ```
>> >
>> > ```shell
>> >
>> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
>> > ```
>> >
>>
>

Reply via email to