nsivabalan commented on code in PR #18585:
URL: https://github.com/apache/hudi/pull/18585#discussion_r3184298283
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -63,6 +69,69 @@
*/
public final class HoodieLocalEngineContext extends HoodieEngineContext {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieLocalEngineContext.class);
+
+ // When running in Java 11+, the common ForkJoinPool's workers don't inherit
the application
+ // classloader, causing ClassNotFoundExceptions. We use a custom pool whose
thread factory
+ // explicitly sets the correct classloader.
+ // See:
https://stackoverflow.com/questions/66240365/java-11-upgrade-from-8-parallel-streams-throws-classnotfoundexception
+ private static final ForkJoinPool FORK_JOIN_POOL = initForkJoinPool();
+
+ private static ForkJoinPool initForkJoinPool() {
+ int javaVersion = 0;
+ try {
+ String specVersion = System.getProperty("java.specification.version");
+ // Pre-Java 9: "1.X" format; Java 9+: "11", "17", "21", etc.
+ javaVersion = specVersion.startsWith("1.")
+ ? Integer.parseInt(specVersion.split("\\.")[1])
+ : Integer.parseInt(specVersion.split("\\.")[0]);
+ } catch (NumberFormatException e) {
+ // Ignore, treat as pre-11
+ }
+ ForkJoinPool commonPool = ForkJoinPool.commonPool();
+ if (javaVersion >= 11) {
+ ForkJoinPool pool = new ForkJoinPool(commonPool.getParallelism(),
makeWorkerThreadFactory(), null, commonPool.getAsyncMode());
+ LOG.info("Using custom fork-join pool: java version={}, #threads={},
asyncMode={}",
+ javaVersion, pool.getParallelism(), pool.getAsyncMode());
+ return pool;
+ }
+ LOG.info("Using common fork-join pool: java version={}, #threads={},
asyncMode={}",
+ javaVersion, commonPool.getParallelism(), commonPool.getAsyncMode());
+ return commonPool;
+ }
+
+ private static ForkJoinPool.ForkJoinWorkerThreadFactory
makeWorkerThreadFactory() {
+ final String prefix = "hoodie-local-engine-context-pool-worker-";
+ return pool -> {
+ final ForkJoinWorkerThread worker =
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+ worker.setName(prefix + worker.getPoolIndex());
+
worker.setContextClassLoader(HoodieLocalEngineContext.class.getClassLoader());
+ LOG.info("Creating worker thread {} with class loader {}",
worker.getName(), worker.getContextClassLoader());
Review Comment:
LOG.info in thread factory fires for every worker thread creation
File: HoodieLocalEngineContext.java, makeWorkerThreadFactory()
LOG.info("Creating worker thread {} with class loader {}",
worker.getName(), worker.getContextClassLoader());
This logs at INFO level every time a worker thread is created. With the
default parallelism matching available cores, this could produce 8-64+ log
lines during class loading. This should be LOG.debug.
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -93,7 +162,8 @@ public <T> HoodieData<T> parallelize(List<T> data, int
parallelism) {
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int
parallelism) {
- return
data.stream().parallel().map(throwingMapWrapper(func)).collect(toList());
+ // parallelism is advisory; actual pool size is fixed at class-load time
via FORK_JOIN_POOL
Review Comment:
map() now changes behavior for ALL callers, not just validators
File: HoodieLocalEngineContext.java:164-166
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func,
int parallelism) {
return mapParallel(data, func);
}
Every caller of HoodieLocalEngineContext.map() — not just
SparkValidatorUtils — now runs on the custom pool instead of the common pool.
This includes metadata table operations, file listing, and other
internal uses. The behavioral change is subtle: work that previously ran
on the JVM-managed common pool now runs on a Hudi-managed pool with the same
parallelism. This is arguably better (classloader safety
everywhere), but it broadens the blast radius beyond the stated fix.
is that intentional?
If yes. why just `map` calls only and not all methods overloaded w/
HoodieLocalEngineContext ?
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -63,6 +69,69 @@
*/
public final class HoodieLocalEngineContext extends HoodieEngineContext {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieLocalEngineContext.class);
+
+ // When running in Java 11+, the common ForkJoinPool's workers don't inherit
the application
+ // classloader, causing ClassNotFoundExceptions. We use a custom pool whose
thread factory
+ // explicitly sets the correct classloader.
+ // See:
https://stackoverflow.com/questions/66240365/java-11-upgrade-from-8-parallel-streams-throws-classnotfoundexception
+ private static final ForkJoinPool FORK_JOIN_POOL = initForkJoinPool();
+
+ private static ForkJoinPool initForkJoinPool() {
+ int javaVersion = 0;
+ try {
+ String specVersion = System.getProperty("java.specification.version");
+ // Pre-Java 9: "1.X" format; Java 9+: "11", "17", "21", etc.
+ javaVersion = specVersion.startsWith("1.")
Review Comment:
Consider using Runtime.version() on Java 9+ instead of parsing
java.specification.version
File: HoodieLocalEngineContext.java, initForkJoinPool()
The version parsing logic works but is more complex than needed. Since
Hudi's minimum supported Java is 8, you could simplify: if
java.specification.version doesn't start with "1.", it's >= 9, which means
it's definitely >= 11 when the parsed int is >= 11. The current code is
correct, just noting there's a simpler path.
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -63,6 +69,69 @@
*/
public final class HoodieLocalEngineContext extends HoodieEngineContext {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieLocalEngineContext.class);
+
+ // When running in Java 11+, the common ForkJoinPool's workers don't inherit
the application
+ // classloader, causing ClassNotFoundExceptions. We use a custom pool whose
thread factory
+ // explicitly sets the correct classloader.
+ // See:
https://stackoverflow.com/questions/66240365/java-11-upgrade-from-8-parallel-streams-throws-classnotfoundexception
+ private static final ForkJoinPool FORK_JOIN_POOL = initForkJoinPool();
Review Comment:
The static field initializes the pool at class-load time, meaning any code
that touches HoodieLocalEngineContext for any reason (even just constructing an
instance for non-parallel work) will eagerly spin up a custom ForkJoinPool
with threads.
A lazy holder pattern would be cleaner:
private static class PoolHolder {
static final ForkJoinPool INSTANCE = initForkJoinPool();
}
// Access via PoolHolder.INSTANCE — only initializes on first use
This is thread-safe (guaranteed by JLS class loading), zero-overhead, and
avoids creating pool threads in code paths that never call map() or
mapParallel().
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]