Meet0861 opened a new issue, #24754:
URL: https://github.com/apache/pulsar/issues/24754

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   Pulsar version : 3.0.12 + cherry-picked - 
https://github.com/apache/pulsar/pull/22765
   But issue is there in master as well.
   
   ### Issue Description
   
   The **topK bundle** selection fails due to the sorting failure in the 
partition sort algo being used 
[here](https://github.com/apache/pulsar/blob/437d9f6a3ccfdfd327ac032c298728fcc43fc263/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java#L115).
 
   Partition sort uses, NamespaceBundleStats' + BundleData's custom 
ccompararable implementations where it checks throughput, connections, cache 
size etc against the defined threshold and when values are within the defined 
thresholds, they are considered "equal" (return 0), but this creates 
transitivity violations. 
   It violates the transitivity property required by Java's Comparable 
interface, causing 
[Collections.sort()](https://github.com/apache/pulsar/blob/437d9f6a3ccfdfd327ac032c298728fcc43fc263/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java#L148)
 to potentially throw IllegalArgumentException with "Comparison method violates 
its general contract" error.
   **Error Log**:
   `2025-09-12T22:29:58.832386492+05:30 16:59:58.832 [pulsar-load-manager-1-1] 
WARN  org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask - Error 
write resource quota - java.lang.IllegalArgumentException: Comparison method 
violates its general contract`
   
   Due to this failure, the job - **writeBundleDataOnZooKeeper**(leader broker 
writes bundle data aggregated from all brokers to metadata store) in 
modularLoadManager may fail 
[link](https://github.com/apache/pulsar/blob/437d9f6a3ccfdfd327ac032c298728fcc43fc263/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java#L1173)
 and can cause degradation in productions due to failure/inconsistencies in LB 
decisions
   
   ### Error messages
   
   ```text
   Error Log:
   `2025-09-12T22:29:58.832386492+05:30 16:59:58.832 [pulsar-load-manager-1-1] 
WARN  org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask - Error 
write resource quota - java.lang.IllegalArgumentException: Comparison method 
violates its general contract`
   ```
   
   ### Reproducing the issue
   
   Unit test to validate the bug
   `@Test
       public void testPartitionSortTransitivityIssue() {
           Random rnd = new Random(0);
           ArrayList<NamespaceBundleStats> stats = new ArrayList<>();
           for (int i = 0; i < 1000; ++i) {
               NamespaceBundleStats s = new NamespaceBundleStats();
               s.msgThroughputIn = 4 * 75000 * rnd.nextDouble();  // Just above 
threshold (1e5)
               s.msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble()));
               s.msgRateIn = 4 * 75 * rnd.nextDouble();
               s.msgRateOut = 75000 - (4 * 75 * rnd.nextDouble());
               s.topics = i;
               s.consumerCount = i;
               s.producerCount = 4 * rnd.nextInt(375);
               s.cacheSize = 75000000 - (rnd.nextInt(4 * 75000));
               stats.add(s);
           }
   
           List<Map.Entry<String, ? extends Comparable>> bundleEntries = new 
ArrayList<>();
   
           for (NamespaceBundleStats s : stats) {
               bundleEntries.add(new HashMap.SimpleEntry<>("bundle-" + 
s.msgThroughputIn, s));
           }
           try {
           TopKBundles.partitionSort(bundleEntries, 100);
          } catch (IllegalArgumentException e) {
               // Verify the exception message contains the expected text
               assertTrue(e.getMessage().contains("Comparison method violates 
its general contract") ||
                         e.getMessage().contains("transitivity") ||
                         e.getMessage().contains("comparison"),
                         "Expected IllegalArgumentException about comparison 
contract violation, got: " + e.getMessage());
           }
   `
   
   ### Additional information
   
   **Problematic Code**:
   `public int compareByBandwidthIn(NamespaceBundleStats other) {
       if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) > 
throughputDifferenceThreshold) {
           return Double.compare(this.msgThroughputIn, other.msgThroughputIn);
       }
       return 0;  // ← This causes transitivity violations
   }`
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to