Meet0861 opened a new issue, #24754: URL: https://github.com/apache/pulsar/issues/24754
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment Pulsar version : 3.0.12 + cherry-picked - https://github.com/apache/pulsar/pull/22765 But issue is there in master as well. ### Issue Description The **topK bundle** selection fails due to the sorting failure in the partition sort algo being used [here](https://github.com/apache/pulsar/blob/437d9f6a3ccfdfd327ac032c298728fcc43fc263/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java#L115). Partition sort uses, NamespaceBundleStats' + BundleData's custom ccompararable implementations where it checks throughput, connections, cache size etc against the defined threshold and when values are within the defined thresholds, they are considered "equal" (return 0), but this creates transitivity violations. It violates the transitivity property required by Java's Comparable interface, causing [Collections.sort()](https://github.com/apache/pulsar/blob/437d9f6a3ccfdfd327ac032c298728fcc43fc263/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java#L148) to potentially throw IllegalArgumentException with "Comparison method violates its general contract" error. **Error Log**: `2025-09-12T22:29:58.832386492+05:30 16:59:58.832 [pulsar-load-manager-1-1] WARN org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask - Error write resource quota - java.lang.IllegalArgumentException: Comparison method violates its general contract` Due to this failure, the job - **writeBundleDataOnZooKeeper**(leader broker writes bundle data aggregated from all brokers to metadata store) in modularLoadManager may fail [link](https://github.com/apache/pulsar/blob/437d9f6a3ccfdfd327ac032c298728fcc43fc263/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java#L1173) and can cause degradation in productions due to failure/inconsistencies in LB decisions ### Error messages ```text Error Log: `2025-09-12T22:29:58.832386492+05:30 16:59:58.832 [pulsar-load-manager-1-1] WARN org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask - Error write resource quota - java.lang.IllegalArgumentException: Comparison method violates its general contract` ``` ### Reproducing the issue Unit test to validate the bug `@Test public void testPartitionSortTransitivityIssue() { Random rnd = new Random(0); ArrayList<NamespaceBundleStats> stats = new ArrayList<>(); for (int i = 0; i < 1000; ++i) { NamespaceBundleStats s = new NamespaceBundleStats(); s.msgThroughputIn = 4 * 75000 * rnd.nextDouble(); // Just above threshold (1e5) s.msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble())); s.msgRateIn = 4 * 75 * rnd.nextDouble(); s.msgRateOut = 75000 - (4 * 75 * rnd.nextDouble()); s.topics = i; s.consumerCount = i; s.producerCount = 4 * rnd.nextInt(375); s.cacheSize = 75000000 - (rnd.nextInt(4 * 75000)); stats.add(s); } List<Map.Entry<String, ? extends Comparable>> bundleEntries = new ArrayList<>(); for (NamespaceBundleStats s : stats) { bundleEntries.add(new HashMap.SimpleEntry<>("bundle-" + s.msgThroughputIn, s)); } try { TopKBundles.partitionSort(bundleEntries, 100); } catch (IllegalArgumentException e) { // Verify the exception message contains the expected text assertTrue(e.getMessage().contains("Comparison method violates its general contract") || e.getMessage().contains("transitivity") || e.getMessage().contains("comparison"), "Expected IllegalArgumentException about comparison contract violation, got: " + e.getMessage()); } ` ### Additional information **Problematic Code**: `public int compareByBandwidthIn(NamespaceBundleStats other) { if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) > throughputDifferenceThreshold) { return Double.compare(this.msgThroughputIn, other.msgThroughputIn); } return 0; // ← This causes transitivity violations }` ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
