ChenSammi commented on PR #8360:
URL: https://github.com/apache/ozone/pull/8360#issuecomment-2862785627
> My initial guess was that this result was caused by the random object, but
it turns out it may have just been affected by some outliers.
Yes, replace Random with ThreadLocalRandom have greatly improve the
performance in my test. After replace using ThreadLocalRandom, we can achieve
almost same performance as without synchronized for
CapacityVolumeChoosingPolicy. Could you implement this optimization with a new
commit?
This is the version I used, based on your code, where a single
ExecutorService is used for the entire test, and local int is used to record
success and failure count in each thread, before aggregate to the global
counter.
```
package org.apache.hadoop.ozone.container.common.volume;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
import org.apache.hadoop.hdds.fs.MockSpaceUsageSource;
import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
import org.apache.hadoop.hdds.fs.SpaceUsagePersistence;
import org.apache.hadoop.hdds.fs.SpaceUsageSource;
import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestVolumeChoosingPolicyPerformance {
private static final OzoneConfiguration CONF = new OzoneConfiguration();
private static final int NUM_VOLUMES = 10;
private static final int NUM_THREADS = 100;
private static final int NUM_ITERATIONS = 1000000;
private static final long CONTAINER_SIZE = 1;
@TempDir
private Path baseDir;
private List<HddsVolume> volumes;
private CapacityVolumeChoosingPolicy capacityPolicy;
private RoundRobinVolumeChoosingPolicy roundRobinPolicy;
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
// @BeforeEach
public void setup() throws Exception {
volumes = new ArrayList<>();
capacityPolicy = new CapacityVolumeChoosingPolicy();
roundRobinPolicy = new RoundRobinVolumeChoosingPolicy();
// Create NUM_VOLUMES volumes with different available space
for (int i = 0; i < NUM_VOLUMES; i++) {
String volumePath = baseDir + "/disk" + i;
// Each volume has 1000 bytes capacity and different available space
SpaceUsageSource source = MockSpaceUsageSource.fixed(1000000000,
1000000000 - i * 50);
SpaceUsageCheckFactory factory = MockSpaceUsageCheckFactory.of(
source, Duration.ZERO, SpaceUsagePersistence.None.INSTANCE);
HddsVolume volume = new HddsVolume.Builder(volumePath)
.conf(CONF)
.usageCheckFactory(factory)
.build();
volumes.add(volume);
}
}
// @AfterEach
public void cleanUp() {
volumes.forEach(HddsVolume::shutdown);
}
@Test
public void testConcurrentVolumeChoosing() throws Exception {
// Test both policies
for (int i = 0; i < 5; i++) {
setup();
testPolicyPerformance("CapacityVolumeChoosingPolicy", capacityPolicy);
cleanUp();
setup();
testPolicyPerformance("RoundRobinVolumeChoosingPolicy",
roundRobinPolicy);
cleanUp();
}
}
private void testPolicyPerformance(String policyName, VolumeChoosingPolicy
policy) throws Exception {
CountDownLatch latch = new CountDownLatch(NUM_THREADS);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failureCount = new AtomicInteger(0);
AtomicLong totalTimeNanos = new AtomicLong(0);
// Submit NUM_THREADS tasks, each doing NUM_ITERATIONS volume choices
for (int i = 0; i < NUM_THREADS; i++) {
executor.submit(() -> {
try {
long threadStartTime = System.nanoTime();
int success = 0;
int failures = 0;
for (int j = 0; j < NUM_ITERATIONS; j++) {
try {
HddsVolume volume = policy.chooseVolume(volumes,
CONTAINER_SIZE);
if (volume != null) {
success++;
}
} catch (Exception e) {
failures++;
}
}
long threadEndTime = System.nanoTime();
totalTimeNanos.addAndGet((threadEndTime - threadStartTime));
successCount.addAndGet(success);
failureCount.addAndGet(failures);
} finally {
latch.countDown();
}
});
}
// Wait for all tasks to complete
assertTrue(latch.await(5, TimeUnit.MINUTES), "Test timed out");
long totalOperations = (long) NUM_THREADS * NUM_ITERATIONS;
double avgTimePerOperation = (double) totalTimeNanos.get() /
totalOperations;
double operationsPerSecond = totalOperations / (totalTimeNanos.get() /
1_000_000_000.0);
System.out.println("Performance results for " + policyName);
System.out.println("Total operations: " + totalOperations);
System.out.println("Successful operations: " + successCount.get());
System.out.println("Failed operations: " + failureCount.get());
System.out.println("Total time: " + totalTimeNanos.get() / 1_000_000 + "
ms");
System.out.println("Average time per operation: " + avgTimePerOperation
+ " ns");
System.out.println("Operations per second: " + operationsPerSecond);
System.out.println("");
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]