Hi Haiting, > I think this approach have more potential with abnormal topic isolation. If we can introduce some kind of bundle isolation strategy, (like broker-bundle affinity and anti-affinity mechanism), we can easily isolate some unexpected traffic to some empty brokers. IMO, this would improve the stability of broker cluster.
if I understand correctly, it looks like if we have a partitioned topic with 10 partitions under a namespace with 16 bundles, if applies the anti-affinity policy, partition-0 map to bundle 0, partition-1 map to bundle 1, and so on. Of course, it is not necessary for every partitioned topic to start from bundle 0, we can use the partition-0 hash to determine the start bundle index. Do you have an example for affinity? I don't fully understand how this is used in practice. Best, Penghui On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <peng...@apache.org> wrote: > Hi Aloys, > > > Do you mean that > 1. First, add a new API, maybe `getHashPositioin`, to get the hash > position in a bundle > 2. Then use this position to split the overloaded bundle > If so, when we split a bundle with multi partitions of a topic, we need to > call the `getHashPositioin` multi times to get the middle position of all > these positions. > > Yes, this want I mean. In this way, users can control to assign 1 topic or > 3 topics to one bundle. This is more like increasing the transparency of > the topic in the bundle, you can all the positions of the topics, so how > planning for bundle splitting becomes more flexible. > > The new API does not necessarily have to query by topic one by one, > we have listed all the "topic -> position" of a bundle? > > Thanks, > Penghui > > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <jianghait...@apache.org> > wrote: > >> Hi Aloys, >> +1 for this great PIP. >> >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b >> ${bundle_range}` >> > will add a new parameter "--topic" or "-t" for `outstanding topic` >> name. >> >> Do we have limitation on this "topic" parameter. Can this be a >> partitioned topic? >> If so, will this new algorithm split the bundle into more than 2 bundles? >> like each bundle for >> one partition. >> >> > This algorithm has a disadvantage, it can only deal >> > with one `outstanding topic`. >> >> For this disadvantage, I think it can be solved by extends the "topic" >> parameter from one topic to a topic list. >> >> > The other algorithm is to split the bundle at the hashcode point of the >> > `outstanding partition` which will split the bundle into three bundles >> once >> > a time. The middle one contains the only point the hashcode of the >> > `outstanding partition, the left one is less than the hashcode, the >> right >> > one is more than the hashcode. >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition` >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this >> > algorithm is going to split bundle the bundle into five new bundles, >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for >> > partition-y), 0x08_0x10. >> >> I think this approach have more potential with abnormal topic isolation. >> If we can introduce >> some kind of bundle isolation strategy, (like broker-bundle affinity and >> anti-affinity mechanism), we can easily isolate some unexpected traffic to >> some empty brokers. >> IMO, this would improve the stability of broker cluster. >> >> Thanks, >> Haiting >> >> On 2022/02/17 15:47:15 Aloys Zhang wrote: >> > Hi Pulsar Community, >> > >> > This is a PIP discussion on how to support split partitions belonging to >> > specified topics in a bundle. >> > >> > The issue can be found: https://github.com/apache/pulsar/issues/13761 >> > >> > I copy the content here for convenience, any suggestions are welcome and >> > appreciated. >> > >> > >> > ## Motivation >> > >> > As we all know, a namespace bundle may contain lots of partitions >> belonging >> > to different topics. >> > The throughput of these topics may vary greatly. Some topics may with >> very >> > high rate/throughput while other topics have a very low rate/throughput. >> > >> > These partitions with high rate/throughput can cause broker overload and >> > bundle unloading. >> > At this point, if we split bundle manually with `range_equally_divide` >> or >> > `topic_count_equally_divide` split algorithm, there may need many times >> > split before these high rate/through partitions assigned to different >> new >> > bundles. >> > >> > For convenience, we call these high throughput topics `outstanding >> topic` >> > and their partitions `outstanding partition` in this PIP. >> > >> > ## Goal >> > >> > Our goal is to make it easier to split `outstanding partition` into new >> > bundles. >> > >> > There are two alternative ways to achieve this. Either of them will add >> a >> > new algorithm for bundle split. The difference is how the new bundle >> split >> > algorithm is implemented. >> > >> > One algorithm is to split bundle by `outstanding topic` which will split >> > the bundle into two new bundles and each new bundle contains an equally >> > `outstanding partition` once a time. >> > E.g, a bundle contains lots of topic partitions, and only one >> `outstanding >> > topic`(T) with 2 `outstanding partition` (T-partition-n, >> Tpartition-n+1). >> > This algorithm split this bundle at the middle point of these two >> > partition's hashcode. This algorithm has a disadvantage, it can only >> deal >> > with one `outstanding topic`. >> > >> > So we raised up another algorithm. >> > >> > The other algorithm is to split the bundle at the hashcode point of the >> > `outstanding partition` which will split the bundle into three bundles >> once >> > a time. The middle one contains the only point the hashcode of the >> > `outstanding partition, the left one is less than the hashcode, the >> right >> > one is more than the hashcode. >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition` >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this >> > algorithm is going to split bundle the bundle into five new bundles, >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for >> > partition-y), 0x08_0x10. >> > >> > ## API Changes >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b >> ${bundle_range}` >> > will add a new parameter "--topic" or "-t" for `outstanding topic` >> name. >> > >> > The split interface changed from >> > >> > ```JAVA >> > void splitNamespaceBundle(String namespace, String bundle, boolean >> > unloadSplitBundles, String splitAlgorithmName)throws >> PulsarAdminException; >> > ``` >> > >> > to >> > >> > ```java >> > void splitNamespaceBundle(String namespace, String bundle, boolean >> > unloadSplitBundles, >> > String splitAlgorithmName, String topic) >> > throws PulsarAdminException; >> > ``` >> > >> > ## Implementation >> > >> > There are changes both from the Admin CLI and the broker side. >> > >> > First, Admin CLI for split bundle should support to specify the >> > `outstanding topic`, >> > >> > ```java >> > /** >> > * Split namespace bundle. >> > * >> > * @param namespace >> > * @param bundle range of bundle to split >> > * @param unloadSplitBundles >> > * @param splitAlgorithmName >> > * @param topic >> > * @throws PulsarAdminException >> > */ >> > void splitNamespaceBundle(String namespace, String bundle, boolean >> > unloadSplitBundles, >> > String splitAlgorithmName, String topic) >> > throws PulsarAdminException; >> > >> > ``` >> > >> > ```java >> > /** >> > * Split namespace bundle asynchronously. >> > * >> > * @param namespace >> > * @param bundle range of bundle to split >> > * @param unloadSplitBundles >> > * @param splitAlgorithmName >> > */ >> > CompletableFuture<Void> splitNamespaceBundleAsync( >> > String namespace, String bundle, boolean unloadSplitBundles, >> > String splitAlgorithmName, String topic); >> > ``` >> > >> > And for the broker side, first encapsulates the parameters for bundle >> split >> > into a new class `BundleSplitOption` >> > >> > ```java >> > public class BundleSplitOption { >> > private NamespaceService service; >> > private NamespaceBundle bundle; >> > private String topic; >> > } >> > ``` >> > >> > add a new split algorithm >> > >> > ```java >> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm >> implements >> > NamespaceBundleSplitAlgorithm { >> > @Override >> > public CompletableFuture<List<Long>> >> getSplitBoundary(BundleSplitOption >> > bundleSplitOption) { >> > >> > }); >> > } >> > } >> > ``` >> > >> > add the new algorithm to `NamespaceBundleSplitAlgorithm` >> > >> > ```JAVA >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE = >> > "specified_topic_count_equally_divide"; >> > >> > List<String> AVAILABLE_ALGORITHMS = >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME, >> > TOPIC_COUNT_EQUALLY_DIVIDE, >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE); >> > >> > NamespaceBundleSplitAlgorithm >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = >> > new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm(); >> > ``` >> > >> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for >> > [[NamespaceService.java]( >> > >> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1 >> ) >> > >> > >> > ```java >> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, >> > boolean unload, >> > >> > NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) { >> > >> > final CompletableFuture<Void> unloadFuture = new >> > CompletableFuture<>(); >> > final AtomicInteger counter = new >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT); >> > splitAndOwnBundleOnceAndRetry(bundle, unload, counter, >> > unloadFuture, splitAlgorithm, topic); >> > >> > return unloadFuture; >> > } >> > ``` >> > >> > ```java >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, >> > boolean unload, >> > AtomicInteger counter, >> > CompletableFuture<Void> >> > completionFuture, >> > NamespaceBundleSplitAlgorithm >> > splitAlgorithm, >> > String topic) { >> > ``` >> > >> > Also, we change the REST api and broker.conf >> > >> > ```java >> > public void splitNamespaceBundle( >> > @Suspended final AsyncResponse asyncResponse, >> > @PathParam("property") String property, >> > @PathParam("cluster") String cluster, >> > @PathParam("namespace") String namespace, >> > @PathParam("bundle") String bundleRange, >> > @QueryParam("authoritative") @DefaultValue("false") boolean >> > authoritative, >> > @QueryParam("unload") @DefaultValue("false") boolean unload, >> > @QueryParam("topic") @DefaultValue("") String topic) {} >> > ``` >> > >> > ```shell >> > >> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide >> > ``` >> > >> >