nsivabalan commented on code in PR #18585:
URL: https://github.com/apache/hudi/pull/18585#discussion_r3184298283


##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -63,6 +69,69 @@
  */
 public final class HoodieLocalEngineContext extends HoodieEngineContext {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLocalEngineContext.class);
+
+  // When running in Java 11+, the common ForkJoinPool's workers don't inherit 
the application
+  // classloader, causing ClassNotFoundExceptions. We use a custom pool whose 
thread factory
+  // explicitly sets the correct classloader.
+  // See: 
https://stackoverflow.com/questions/66240365/java-11-upgrade-from-8-parallel-streams-throws-classnotfoundexception
+  private static final ForkJoinPool FORK_JOIN_POOL = initForkJoinPool();
+
+  private static ForkJoinPool initForkJoinPool() {
+    int javaVersion = 0;
+    try {
+      String specVersion = System.getProperty("java.specification.version");
+      // Pre-Java 9: "1.X" format; Java 9+: "11", "17", "21", etc.
+      javaVersion = specVersion.startsWith("1.")
+          ? Integer.parseInt(specVersion.split("\\.")[1])
+          : Integer.parseInt(specVersion.split("\\.")[0]);
+    } catch (NumberFormatException e) {
+      // Ignore, treat as pre-11
+    }
+    ForkJoinPool commonPool = ForkJoinPool.commonPool();
+    if (javaVersion >= 11) {
+      ForkJoinPool pool = new ForkJoinPool(commonPool.getParallelism(), 
makeWorkerThreadFactory(), null, commonPool.getAsyncMode());
+      LOG.info("Using custom fork-join pool: java version={}, #threads={}, 
asyncMode={}",
+          javaVersion, pool.getParallelism(), pool.getAsyncMode());
+      return pool;
+    }
+    LOG.info("Using common fork-join pool: java version={}, #threads={}, 
asyncMode={}",
+        javaVersion, commonPool.getParallelism(), commonPool.getAsyncMode());
+    return commonPool;
+  }
+
+  private static ForkJoinPool.ForkJoinWorkerThreadFactory 
makeWorkerThreadFactory() {
+    final String prefix = "hoodie-local-engine-context-pool-worker-";
+    return pool -> {
+      final ForkJoinWorkerThread worker = 
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+      worker.setName(prefix + worker.getPoolIndex());
+      
worker.setContextClassLoader(HoodieLocalEngineContext.class.getClassLoader());
+      LOG.info("Creating worker thread {} with class loader {}", 
worker.getName(), worker.getContextClassLoader());

Review Comment:
    LOG.info in thread factory fires for every worker thread creation
   
     File: HoodieLocalEngineContext.java, makeWorkerThreadFactory()
   
     LOG.info("Creating worker thread {} with class loader {}", 
worker.getName(), worker.getContextClassLoader());
   
     This logs at INFO level every time a worker thread is created. With the 
default parallelism matching available cores, this could produce 8-64+ log 
lines during class loading. This should be LOG.debug.



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -93,7 +162,8 @@ public <T> HoodieData<T> parallelize(List<T> data, int 
parallelism) {
 
   @Override
   public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int 
parallelism) {
-    return 
data.stream().parallel().map(throwingMapWrapper(func)).collect(toList());
+    // parallelism is advisory; actual pool size is fixed at class-load time 
via FORK_JOIN_POOL

Review Comment:
    map() now changes behavior for ALL callers, not just validators
   
     File: HoodieLocalEngineContext.java:164-166
   
     public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, 
int parallelism) {
         return mapParallel(data, func);
     }
   
     Every caller of HoodieLocalEngineContext.map() — not just 
SparkValidatorUtils — now runs on the custom pool instead of the common pool. 
This includes metadata table operations, file listing, and other
     internal uses. The behavioral change is subtle: work that previously ran 
on the JVM-managed common pool now runs on a Hudi-managed pool with the same 
parallelism. This is arguably better (classloader safety
     everywhere), but it broadens the blast radius beyond the stated fix.
   
   is that intentional? 
   If yes. why just `map` calls only and not all methods overloaded w/ 
HoodieLocalEngineContext ? 



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -63,6 +69,69 @@
  */
 public final class HoodieLocalEngineContext extends HoodieEngineContext {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLocalEngineContext.class);
+
+  // When running in Java 11+, the common ForkJoinPool's workers don't inherit 
the application
+  // classloader, causing ClassNotFoundExceptions. We use a custom pool whose 
thread factory
+  // explicitly sets the correct classloader.
+  // See: 
https://stackoverflow.com/questions/66240365/java-11-upgrade-from-8-parallel-streams-throws-classnotfoundexception
+  private static final ForkJoinPool FORK_JOIN_POOL = initForkJoinPool();
+
+  private static ForkJoinPool initForkJoinPool() {
+    int javaVersion = 0;
+    try {
+      String specVersion = System.getProperty("java.specification.version");
+      // Pre-Java 9: "1.X" format; Java 9+: "11", "17", "21", etc.
+      javaVersion = specVersion.startsWith("1.")

Review Comment:
   Consider using Runtime.version() on Java 9+ instead of parsing 
java.specification.version
   
     File: HoodieLocalEngineContext.java, initForkJoinPool()
   
     The version parsing logic works but is more complex than needed. Since 
Hudi's minimum supported Java is 8, you could simplify: if 
java.specification.version doesn't start with "1.", it's >= 9, which means
     it's definitely >= 11 when the parsed int is >= 11. The current code is 
correct, just noting there's a simpler path.



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -63,6 +69,69 @@
  */
 public final class HoodieLocalEngineContext extends HoodieEngineContext {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLocalEngineContext.class);
+
+  // When running in Java 11+, the common ForkJoinPool's workers don't inherit 
the application
+  // classloader, causing ClassNotFoundExceptions. We use a custom pool whose 
thread factory
+  // explicitly sets the correct classloader.
+  // See: 
https://stackoverflow.com/questions/66240365/java-11-upgrade-from-8-parallel-streams-throws-classnotfoundexception
+  private static final ForkJoinPool FORK_JOIN_POOL = initForkJoinPool();

Review Comment:
   The static field initializes the pool at class-load time, meaning any code 
that touches HoodieLocalEngineContext for any reason (even just constructing an 
       
     instance for non-parallel work) will eagerly spin up a custom ForkJoinPool 
with threads.                                                                   
                                                    
                                                                                
                                                                                
                                                    
     A lazy holder pattern would be cleaner:
                                                                                
                                                                                
                                                    
     private static class PoolHolder {                                          
                                                                                
                                                  
         static final ForkJoinPool INSTANCE = initForkJoinPool();               
                                                                                
                                                    
     }
                                                                                
                                                                                
                                                    
     // Access via PoolHolder.INSTANCE — only initializes on first use          
                                                                                
                                                    
     
     This is thread-safe (guaranteed by JLS class loading), zero-overhead, and 
avoids creating pool threads in code paths that never call map() or 
mapParallel(). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to