mynameborat commented on code in PR #1667:
URL: https://github.com/apache/samza/pull/1667#discussion_r1227068381


##########
samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java:
##########
@@ -37,4 +45,36 @@ public ExecutorService getTaskExecutor(Config config) {
     return Executors.newFixedThreadPool(threadPoolSize,
         new ThreadFactoryBuilder().setNameFormat("Samza Container 
Thread-%d").build());
   }
+
+  /**
+   * {@inheritDoc}
+   *
+   * The choice of thread pool is determined based on the following logic
+   *    If job.operator.thread.pool.enabled,
+   *     a. Use {@link #getTaskExecutor(Config)} if 
job.container.thread.pool.size > 1
+   *     b. Use default single threaded pool otherwise
+   * <b>Note:</b> The default single threaded pool used is a substitute for 
the scenario where container thread pool is null and
+   * the messages are dispatched on runloop thread. We can't have the stages 
schedule on the run loop thread and hence
+   * the fallback to use a single threaded executor across all tasks.
+   */
+  @Override
+  public ExecutorService getOperatorExecutor(TaskName taskName, Config config) 
{
+    ExecutorService taskExecutor = TASK_EXECUTORS.computeIfAbsent(taskName, 
key -> {
+      final int threadPoolSize = new JobConfig(config).getThreadPoolSize();
+      ExecutorService operatorThreadPool;
+
+      if (threadPoolSize > 1) {
+        LOG.info("Using container thread pool as operator thread pool for task 
{}", key.getTaskName());
+        operatorThreadPool = getTaskExecutor(config);

Review Comment:
   Separating out DI and the actual usage of thread pools is the goal. The 
first iteration falling back to container thread pool. This would change with 
different implementations. The flexibility of which thread pool to use is the 
goal of this PR and using the taskExecutor prevents that goal.
   
   The default implementation here is not necessarily the implementation we 
will use within LI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to