lukecwik commented on code in PR #25300:
URL: https://github.com/apache/beam/pull/25300#discussion_r1095985769


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService 
executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception 
{
+    UnboundedScheduledExecutorService executorService = new 
UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new 
SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but 
sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);

Review Comment:
   Instead of using a sleep, consider having each thread wait for a condition 
so we know that they are all running and then each submit 1000 tasks.



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java:
##########
@@ -502,4 +509,45 @@ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService 
executorService) thro
     }
     Thread.sleep(100);
   }
+
+  @Test
+  public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception 
{
+    UnboundedScheduledExecutorService executorService = new 
UnboundedScheduledExecutorService();
+    CountDownLatch done = new CountDownLatch(1);
+
+    ThreadPoolExecutor executor =
+        new ThreadPoolExecutor(100, 100, Long.MAX_VALUE, MILLISECONDS, new 
SynchronousQueue<>());
+    // Schedule 1000 threads that are going to be scheduling work non-stop but 
sequentially.
+    for (int i = 0; i < 100; ++i) {
+      executor.execute(
+          () -> {
+            // Periodically check if done.
+            while (done.getCount() == 1) {
+              for (int j = 0; j < 100; ++j) {
+                try {
+                  executorService.submit(() -> {
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }).get();
+                } catch (InterruptedException | ExecutionException e) {
+                  // Ignore, happens on executor shutdown.
+                }
+              }
+            }
+          });
+    }
+
+    Thread.sleep(20 * 1000);
+    done.countDown();
+    executor.shutdown();
+    executor.awaitTermination(1, MINUTES);
+
+    int largestPool = executorService.threadPoolExecutor.getLargestPoolSize();
+    LOG.info("Created {} threads to execute at most 100 parallel tasks", 
largestPool);
+    assert(largestPool <= 100);

Review Comment:
   assert is a java assert and not a junit test assertion
   
   java asserts are no-ops unless being run with assertions enabled



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable 
threads.
-            Long.MAX_VALUE,
-            TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
-            new SynchronousQueue<>(),
+            // Put a high-timeout on non-core threads. This reduces memory for 
per-thread caches
+            // over time.
+            1,
+            TimeUnit.HOURS,
+            new SynchronousQueue<Runnable>() {
+              @Override
+              public boolean offer(Runnable r) {
+                try {
+                  // By blocking for a little we hope to delay thread creation 
if there are existing

Review Comment:
   This is a good short term solution for tasks to be more likely paired up 
with existing threads, if the system is very busy then I'm not sure how much 
this will help.
   
   I had originally thought of storing the set of unused and available threads 
instead of using this queue over tasks but there is higher communication 
overhead to make that happen since you insert threads when they are done, pull 
them out when you want to assign something and then have to notify them to wake 
them up.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -201,9 +201,26 @@ public UnboundedScheduledExecutorService() {
         new ThreadPoolExecutor(
             0,
             Integer.MAX_VALUE, // Allow an unlimited number of re-usable 
threads.
-            Long.MAX_VALUE,

Review Comment:
   Note that these were the original parameters when we didn't have a scheduled 
executor service.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to