lukecwik commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1095985769
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService
executorService) thro
}
Thread.sleep(100);
}
+
+ @Test
+ public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception
{
+ UnboundedScheduledExecutorService executorService = new
UnboundedScheduledExecutorService();
+ CountDownLatch done = new CountDownLatch(1);
+
+ ThreadPoolExecutor executor =
+ new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new
SynchronousQueue<>());
+ // Schedule 1000 threads that are going to be scheduling work non-stop but
sequentially.
+ for (int i = 0; i < 100; ++i) {
+ executor.execute(
+ () -> {
+ // Periodically check if done.
+ while (done.getCount() == 1) {
+ for (int j = 0; j < 100; ++j) {
+ try {
+ executorService.submit(() -> {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).get();
+ } catch (InterruptedException | ExecutionException e) {
+ // Ignore, happens on executor shutdown.
+ }
+ }
+ }
+ });
+ }
+
+ Thread.sleep(20 * 1000);
Review Comment:
Instead of using a sleep, consider having each thread wait for a condition
so we know that they are all running and then each submit 1000 tasks.
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService
executorService) thro
}
Thread.sleep(100);
}
+
+ @Test
+ public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception
{
+ UnboundedScheduledExecutorService executorService = new
UnboundedScheduledExecutorService();
+ CountDownLatch done = new CountDownLatch(1);
+
+ ThreadPoolExecutor executor =
+ new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new
SynchronousQueue<>());
+ // Schedule 1000 threads that are going to be scheduling work non-stop but
sequentially.
+ for (int i = 0; i < 100; ++i) {
+ executor.execute(
+ () -> {
+ // Periodically check if done.
+ while (done.getCount() == 1) {
+ for (int j = 0; j < 100; ++j) {
+ try {
+ executorService.submit(() -> {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).get();
+ } catch (InterruptedException | ExecutionException e) {
+ // Ignore, happens on executor shutdown.
+ }
+ }
+ }
+ });
+ }
+
+ Thread.sleep(20 * 1000);
+ done.countDown();
+ executor.shutdown();
+ executor.awaitTermination(1, MINUTES);
+
+ int largestPool = executorService.threadPoolExecutor.getLargestPoolSize();
+ LOG.info("Created {} threads to execute at most 100 parallel tasks",
largestPool);
+ assert(largestPool <= 100);
Review Comment:
assert is a java assert and not a junit test assertion
java asserts are no-ops unless being run with assertions enabled
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
new ThreadPoolExecutor(
0,
Integer.MAX_VALUE, // Allow an unlimited number of re-usable
threads.
- Long.MAX_VALUE,
- TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
- new SynchronousQueue<>(),
+ // Put a high-timeout on non-core threads. This reduces memory for
per-thread caches
+ // over time.
+ 1,
+ TimeUnit.HOURS,
+ new SynchronousQueue<Runnable>() {
+ @Override
+ public boolean offer(Runnable r) {
+ try {
+ // By blocking for a little we hope to delay thread creation
if there are existing
Review Comment:
This is a good short term solution for tasks to be more likely paired up
with existing threads, if the system is very busy then I'm not sure how much
this will help.
I had originally thought of storing the set of unused and available threads
instead of using this queue over tasks but there is higher communication
overhead to make that happen since you insert threads when they are done, pull
them out when you want to assign something and then have to notify them to wake
them up.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
new ThreadPoolExecutor(
0,
Integer.MAX_VALUE, // Allow an unlimited number of re-usable
threads.
- Long.MAX_VALUE,
Review Comment:
Note that these were the original parameters when we didn't have a scheduled
executor service.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]