Copilot commented on code in PR #25104:
URL: https://github.com/apache/pulsar/pull/25104#discussion_r2642322114
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -204,6 +206,81 @@ public void testOpenCursorWithNullInitialPosition() throws
Exception {
assertEquals(cursor.getMarkDeletedPosition(),
ledger.getLastConfirmedEntry());
}
+ @Test
+ public void testConcurrentPropertyOperationsThreadSafety() throws
Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ ManagedLedger ledger =
factory.open("testConcurrentPropertyOperationsThreadSafety", config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor("c_concurrent", null);
+
+ int threadCount = 100;
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+ //Stress test with random operations for 5 seconds
Review Comment:
Missing space after comment marker. Should be "// Stress test" instead of
"//Stress test".
```suggestion
// Stress test with random operations for 5 seconds
```
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -204,6 +206,81 @@ public void testOpenCursorWithNullInitialPosition() throws
Exception {
assertEquals(cursor.getMarkDeletedPosition(),
ledger.getLastConfirmedEntry());
}
+ @Test
+ public void testConcurrentPropertyOperationsThreadSafety() throws
Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ ManagedLedger ledger =
factory.open("testConcurrentPropertyOperationsThreadSafety", config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor("c_concurrent", null);
+
+ int threadCount = 100;
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+ //Stress test with random operations for 5 seconds
+ long testEndTime = System.currentTimeMillis() + 5000;
+ AtomicLong totalOperations = new AtomicLong(0);
+ AtomicLong exceptionCount = new AtomicLong(0);
+ AtomicBoolean inconsistencyDetected = new AtomicBoolean(false);
+
+ ConcurrentHashMap<String, Long> records = new ConcurrentHashMap<>();
+ while (System.currentTimeMillis() < testEndTime) {
+ executor.submit(() -> {
+ try {
+ totalOperations.incrementAndGet();
+ Random random = new Random();
+ int operationType = random.nextInt(3);
+ String randomKey = "key" + random.nextInt(20);
+
+ switch (operationType) {
+ case 0: // Put operation
+ Long randomValue = random.nextLong();
+ cursor.putProperty(randomKey, randomValue);
+ records.put(randomKey, randomValue);
+ break;
+
+ case 1: // Remove operation
+ cursor.removeProperty(randomKey);
+ records.remove(randomKey);
+ break;
+
+ case 2: // Read and verify operation
+ Map<String, Long> properties =
cursor.getProperties();
+ // Verify no inconsistent state (key exists but
value is null)
+ if (properties.containsKey(randomKey) &&
properties.get(randomKey) == null) {
+ inconsistencyDetected.set(true);
+ fail("INCONSISTENT STATE DETECTED: Key '" +
randomKey + "' exists but has null value");
+ }
+ break;
+ }
+ } catch (ConcurrentModificationException cme) {
+ exceptionCount.incrementAndGet();
+ } catch (Exception e) {
+ exceptionCount.incrementAndGet();
+ fail("Unexpected exception: " + e.getMessage());
+ }
+ });
+ }
+
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+
+ // Then: Verify test results
+ // 1. No ConcurrentModificationException should occur with fixed code
+ assertEquals(exceptionCount.get(), 0, "No exceptions should occur with
thread-safe implementation");
+
+ // 2. No inconsistent states detected
+ assertFalse(inconsistencyDetected.get(), "No inconsistent states (key
with null value) should be detected");
+
+ // 3. Verify property data
+ Map<String, Long> finalProperties = cursor.getProperties();
+ assertEquals(records.size(), finalProperties.size());
+ try {
+ for (String key : finalProperties.keySet()) {
+ assertEquals(records.get(key), finalProperties.get(key),
"Values should match for key: " + key);
+ }
+ } catch (Exception e) {
+ fail("HashMap corruption detected: " + e.getMessage());
Review Comment:
This test has a race condition that undermines its reliability. The main
thread submits tasks to the executor in a tight loop while checking the time,
but immediately after the loop exits, it shuts down the executor and verifies
the final state. However, many tasks may still be queued or executing when
verification begins. Additionally, the parallel tracking using the 'records'
ConcurrentHashMap is fundamentally flawed because operations on 'cursor' and
'records' are not atomic as a pair - between a cursor.putProperty and
records.put, or between cursor.removeProperty and records.remove, other threads
can interleave operations, making the final assertion (lines 275-279)
unreliable.
The test should either:
1. Collect all Future objects from executor.submit() calls and wait for them
to complete before verification, or
2. Remove the 'records' tracking map and only verify that no exceptions
occur and that the cursor's internal state is consistent (no null values for
existing keys)
```suggestion
// 3. Verify cursor property data is internally consistent (no null
keys or values)
Map<String, Long> finalProperties = cursor.getProperties();
for (Map.Entry<String, Long> entry : finalProperties.entrySet()) {
assertNotNull(entry.getKey(), "Property key should not be null");
assertNotNull(entry.getValue(), "Property value should not be
null for key: " + entry.getKey());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]