This is an automated email from the ASF dual-hosted git repository.
nsivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0915f7776be2 fix: Isolate classloader-aware parallel execution in
HoodiePreCommitValidatorEngineContext (#18585)
0915f7776be2 is described below
commit 0915f7776be2a7d08ea0f23000deeceff7caa8c6
Author: ashokkumar-allu <[email protected]>
AuthorDate: Fri Jun 12 21:09:16 2026 -0500
fix: Isolate classloader-aware parallel execution in
HoodiePreCommitValidatorEngineContext (#18585)
Move the classloader-aware parallel execution logic out of
HoodieLocalEngineContext
into a new dedicated ExecutorServiceBasedEngineContext that extends
HoodieLocalEngineContext and overrides map() using an
ExecutorService-backed thread
pool. This avoids polluting HoodieLocalEngineContext with
pre-commit-specific
concerns and follows the reviewer's suggestion.
Co-authored-by: gallu <[email protected]>
---
.../hudi/client/utils/SparkValidatorUtils.java | 38 +--
.../client/validator/SparkPreCommitValidator.java | 12 +
.../hudi/client/utils/TestSparkValidatorUtils.java | 209 +++++++++++++++
.../validator/TestSparkPreCommitValidator.java | 143 ++++++++++
.../engine/ExecutorServiceBasedEngineContext.java | 290 +++++++++++++++++++++
.../TestExecutorServiceBasedEngineContext.java | 112 ++++++++
.../engine/TestHoodieLocalEngineContext.java | 12 +-
7 files changed, 792 insertions(+), 24 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index 40ee7d8cefff..233f622beff6 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.ExecutorServiceBasedEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -50,7 +51,6 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -83,7 +83,7 @@ public class SparkValidatorUtils {
Dataset<Row> afterState = getRecordsFromPendingCommits(sqlContext,
partitionsModified, writeMetadata, table, instantTime);
Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext,
partitionsModified, table, afterState.schema());
- Stream<SparkPreCommitValidator> validators =
Arrays.stream(config.getPreCommitValidators().split(","))
+ List<SparkPreCommitValidator> validators =
Arrays.stream(config.getPreCommitValidators().split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.flatMap(validatorClass -> {
@@ -105,10 +105,12 @@ public class SparkValidatorUtils {
} catch (ReflectiveOperationException e) {
throw new HoodieValidationException("Failed to instantiate
validator: " + validatorClass, e);
}
- });
+ })
+ .collect(Collectors.toList());
- boolean allSuccess = validators.map(v -> runValidatorAsync(v,
writeMetadata, beforeState, afterState,
instantTime)).map(CompletableFuture::join)
- .reduce(true, Boolean::logicalAnd);
+ boolean allSuccess = new
ExecutorServiceBasedEngineContext(context.getStorageConf())
+ .map(validators, v -> runValidator(v, writeMetadata, beforeState,
afterState, instantTime), validators.size())
+ .stream().reduce(true, Boolean::logicalAnd);
if (allSuccess) {
LOG.info("All validations succeeded");
@@ -120,20 +122,20 @@ public class SparkValidatorUtils {
}
/**
- * Run validators in a separate thread pool for parallelism. Each of
validator can submit a distributed spark job if needed.
+ * Run a single validator synchronously in the calling thread; parallelism
across validators is
+ * provided by the {@link ExecutorServiceBasedEngineContext#map} call site.
Each validator may submit a distributed Spark
+ * job if needed.
*/
- private static CompletableFuture<Boolean>
runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata<?>
writeMetadata,
- Dataset<Row>
beforeState, Dataset<Row> afterState, String instantTime) {
- return CompletableFuture.supplyAsync(() -> {
- try {
- validator.validate(instantTime, writeMetadata, beforeState,
afterState);
- LOG.info("validation complete for {}", validator.getClass().getName());
- return true;
- } catch (HoodieValidationException e) {
- LOG.error("validation failed for {}", validator.getClass().getName(),
e);
- return false;
- }
- });
+ private static boolean runValidator(SparkPreCommitValidator validator,
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata,
+ Dataset<Row> beforeState, Dataset<Row>
afterState, String instantTime) {
+ try {
+ validator.validate(instantTime, writeMetadata, beforeState, afterState);
+ LOG.info("validation complete for {}", validator.getClass().getName());
+ return true;
+ } catch (HoodieValidationException e) {
+ LOG.error("validation failed for {}", validator.getClass().getName(), e);
+ return false;
+ }
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
index 7e1da34c2442..b06e80bf6392 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
@@ -82,6 +82,18 @@ public abstract class SparkPreCommitValidator<T, I, K, O
extends HoodieData<Writ
HoodieTimer timer = HoodieTimer.start();
try {
validateRecordsBeforeAndAfter(before, after,
getPartitionsModified(writeResult));
+ } catch (HoodieValidationException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ // Unexpected bug (NPE, ClassCastException, etc.) — re-throw as-is so it
propagates
+ // crash-loud with the original stack trace instead of being silently
swallowed as a
+ // generic "validation failed" message.
+ log.error("Validator {} threw unexpected exception for instant {}",
getClass().getName(), instantTime, e);
+ throw e;
+ } catch (Exception e) {
+ // Checked exception — promote to RuntimeException so it propagates
crash-loud.
+ log.error("Validator {} threw unexpected checked exception for instant
{}", getClass().getName(), instantTime, e);
+ throw new RuntimeException(e);
} finally {
long duration = timer.endTimer();
log.info(getClass() + " validator took " + duration + " ms" + ", metrics
on? " + getWriteConfig().isMetricsOn());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
index c69756f63903..ac54baea9c34 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
@@ -20,18 +20,30 @@ package org.apache.hudi.client.utils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Tests for {@link SparkValidatorUtils}.
@@ -95,4 +107,201 @@ public class TestSparkValidatorUtils extends
HoodieClientTestBase {
"Should have 2 commits (one with data, one empty)");
}
}
+
+ /**
+ * Verifies that two custom validators are both invoked in parallel via
ExecutorServiceBasedEngineContext
+ * without ClassNotFoundException, confirming that classloader-loaded user
validator classes execute correctly.
+ */
+ @Test
+ public void testTwoValidatorsBothInvoked() throws Exception {
+ CountingValidator.INVOCATION_COUNT.set(0);
+
+ HoodieWriteConfig configWithTwoValidators = getConfigBuilder()
+ .withPreCommitValidatorConfig(
+ HoodiePreCommitValidatorConfig.newBuilder()
+ .withPreCommitValidator(
+ CountingValidator.class.getName() + "," +
CountingValidator.class.getName())
+ .build())
+ .build();
+
+ try (SparkRDDWriteClient writeClient =
getHoodieWriteClient(configWithTwoValidators)) {
+ String commit = "001";
+ writeBatch(
+ writeClient,
+ commit,
+ "000",
+ Option.empty(),
+ "000",
+ 5,
+ generateWrapRecordsFn(false, configWithTwoValidators,
dataGen::generateInserts),
+ SparkRDDWriteClient::bulkInsert,
+ true,
+ 5,
+ 5,
+ 1,
+ false,
+ INSTANT_GENERATOR);
+ }
+
+ Assertions.assertEquals(2, CountingValidator.INVOCATION_COUNT.get(),
+ "Both configured validators must have been invoked in parallel");
+ }
+
+ /**
+ * Verifies that when a validator throws {@link HoodieValidationException},
it surfaces somewhere
+ * in the exception cause chain of the write operation.
+ * The write client wraps validator exceptions in a {@code
HoodieInsertException}, so we walk
+ * the cause chain rather than asserting the top-level type.
+ */
+ @Test
+ public void testValidatorFailurePropagatesException() throws Exception {
+ HoodieWriteConfig configWithFailingValidator = getConfigBuilder()
+ .withPreCommitValidatorConfig(
+ HoodiePreCommitValidatorConfig.newBuilder()
+ .withPreCommitValidator(FailingValidator.class.getName())
+ .build())
+ .build();
+
+ try (SparkRDDWriteClient writeClient =
getHoodieWriteClient(configWithFailingValidator)) {
+ String commit = "001";
+ Exception thrown = assertThrows(Exception.class, () ->
+ writeBatch(
+ writeClient,
+ commit,
+ "000",
+ Option.empty(),
+ "000",
+ 5,
+ generateWrapRecordsFn(false, configWithFailingValidator,
dataGen::generateInserts),
+ SparkRDDWriteClient::bulkInsert,
+ true,
+ 5,
+ 5,
+ 1,
+ false,
+ INSTANT_GENERATOR),
+ "A failing validator must cause the write operation to throw");
+
+ // Walk the cause chain: bulkInsert wraps the HoodieValidationException
in HoodieInsertException.
+ Throwable cause = thrown;
+ while (cause != null && !(cause instanceof HoodieValidationException)) {
+ cause = cause.getCause();
+ }
+ Assertions.assertNotNull(cause,
+ "HoodieValidationException must appear somewhere in the exception
cause chain");
+ Assertions.assertInstanceOf(HoodieValidationException.class, cause);
+ }
+ }
+
+ /**
+ * Verifies that when a validator throws an unexpected RuntimeException
(e.g. NPE or
+ * IllegalStateException — a validator bug), the exception is NOT silently
swallowed as a
+ * generic "validation failed" message. The original exception must appear
in the cause chain
+ * so operators can diagnose the real problem.
+ */
+ @Test
+ public void testUnexpectedValidatorExceptionIsNotSilenced() throws Exception
{
+ HoodieWriteConfig configWithBuggyValidator = getConfigBuilder()
+ .withPreCommitValidatorConfig(
+ HoodiePreCommitValidatorConfig.newBuilder()
+ .withPreCommitValidator(BuggyValidator.class.getName())
+ .build())
+ .build();
+
+ try (SparkRDDWriteClient writeClient =
getHoodieWriteClient(configWithBuggyValidator)) {
+ String commit = "001";
+ Exception thrown = assertThrows(Exception.class, () ->
+ writeBatch(
+ writeClient,
+ commit,
+ "000",
+ Option.empty(),
+ "000",
+ 5,
+ generateWrapRecordsFn(false, configWithBuggyValidator,
dataGen::generateInserts),
+ SparkRDDWriteClient::bulkInsert,
+ true,
+ 5,
+ 5,
+ 1,
+ false,
+ INSTANT_GENERATOR),
+ "A buggy validator must still cause the write to fail");
+
+ // The original IllegalStateException must be visible somewhere in the
cause chain.
+ // It must NOT be silently converted into a plain "At least one
pre-commit validation failed".
+ Throwable cause = thrown;
+ boolean foundOriginal = false;
+ while (cause != null) {
+ if (cause instanceof IllegalStateException
+ && "simulated bug in validator".equals(cause.getMessage())) {
+ foundOriginal = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+ Assertions.assertTrue(foundOriginal,
+ "The original IllegalStateException from the buggy validator must
appear in the cause chain, "
+ + "not be buried under a generic 'validation failed' message.
Full exception: " + thrown);
+ }
+ }
+
+ /**
+ * Minimal validator that records each invocation. Must be a public static
class so that
+ * ReflectionUtils can instantiate it by name during runValidators.
+ */
+ public static class CountingValidator<T, I, K, O extends
HoodieData<WriteStatus>>
+ extends SparkPreCommitValidator<T, I, K, O> {
+
+ static final AtomicInteger INVOCATION_COUNT = new AtomicInteger(0);
+
+ public CountingValidator(HoodieSparkTable<T> table, HoodieEngineContext
context,
+ HoodieWriteConfig config) {
+ super(table, context, config);
+ }
+
+ @Override
+ protected void validateRecordsBeforeAndAfter(Dataset<Row> before,
Dataset<Row> after,
+ Set<String>
partitionsAffected) {
+ INVOCATION_COUNT.incrementAndGet();
+ }
+ }
+
+ /**
+ * Validator that always fails with {@link HoodieValidationException}. Must
be a public static
+ * class so that ReflectionUtils can instantiate it by name during
runValidators.
+ */
+ public static class FailingValidator<T, I, K, O extends
HoodieData<WriteStatus>>
+ extends SparkPreCommitValidator<T, I, K, O> {
+
+ public FailingValidator(HoodieSparkTable<T> table, HoodieEngineContext
context,
+ HoodieWriteConfig config) {
+ super(table, context, config);
+ }
+
+ @Override
+ protected void validateRecordsBeforeAndAfter(Dataset<Row> before,
Dataset<Row> after,
+ Set<String>
partitionsAffected) {
+ throw new HoodieValidationException("intentional failure from
FailingValidator");
+ }
+ }
+
+ /**
+ * Validator that throws an unexpected RuntimeException (simulates a
validator bug such as NPE).
+ * Must be a public static class so that ReflectionUtils can instantiate it
by name.
+ */
+ public static class BuggyValidator<T, I, K, O extends
HoodieData<WriteStatus>>
+ extends SparkPreCommitValidator<T, I, K, O> {
+
+ public BuggyValidator(HoodieSparkTable<T> table, HoodieEngineContext
context,
+ HoodieWriteConfig config) {
+ super(table, context, config);
+ }
+
+ @Override
+ protected void validateRecordsBeforeAndAfter(Dataset<Row> before,
Dataset<Row> after,
+ Set<String>
partitionsAffected) {
+ throw new IllegalStateException("simulated bug in validator");
+ }
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/validator/TestSparkPreCommitValidator.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/validator/TestSparkPreCommitValidator.java
new file mode 100644
index 000000000000..83ee85302cbe
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/validator/TestSparkPreCommitValidator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for exception-handling behavior in {@link
SparkPreCommitValidator#validate}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestSparkPreCommitValidator {
+
+ @Mock
+ @SuppressWarnings("rawtypes")
+ private HoodieSparkTable table;
+
+ @Mock
+ private HoodieEngineContext engineContext;
+
+ @Mock
+ private HoodieWriteConfig writeConfig;
+
+ @Mock
+ @SuppressWarnings("rawtypes")
+ private HoodieWriteMetadata writeMetadata;
+
+ @BeforeEach
+ @SuppressWarnings("unchecked")
+ void setUp() {
+ when(writeConfig.getTableName()).thenReturn("test-table");
+ when(writeConfig.isMetricsOn()).thenReturn(false);
+
when(writeMetadata.getWriteStats()).thenReturn(Option.of(Collections.emptyList()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testValidateSucceeds() {
+ SparkPreCommitValidator<?, ?, ?, HoodieData<WriteStatus>> validator =
+ new NoOpValidator(table, engineContext, writeConfig);
+ assertDoesNotThrow(() -> validator.validate("001", writeMetadata, null,
null),
+ "validate must not throw when validateRecordsBeforeAndAfter completes
normally");
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testValidateRethrowsUnexpectedRuntimeException() {
+ RuntimeException cause = new RuntimeException("disk full");
+ SparkPreCommitValidator<?, ?, ?, HoodieData<WriteStatus>> validator =
+ new ThrowingValidator(table, engineContext, writeConfig, cause);
+
+ RuntimeException ex = assertThrows(RuntimeException.class,
+ () -> validator.validate("001", writeMetadata, null, null));
+ assertSame(cause, ex,
+ "unexpected RuntimeException must propagate as-is so the operator sees
the original stack trace, "
+ + "not a generic 'validation failed' message");
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testValidateReThrowsValidationException() {
+ HoodieValidationException original = new HoodieValidationException("bad
data");
+ SparkPreCommitValidator<?, ?, ?, HoodieData<WriteStatus>> validator =
+ new ThrowingValidator(table, engineContext, writeConfig, original);
+
+ HoodieValidationException ex =
assertThrows(HoodieValidationException.class,
+ () -> validator.validate("001", writeMetadata, null, null));
+ assertSame(original, ex,
+ "HoodieValidationException must be rethrown as-is without additional
wrapping");
+ }
+
+ /** Minimal concrete validator that completes normally. */
+ private static class NoOpValidator<T, I, K, O extends
HoodieData<WriteStatus>>
+ extends SparkPreCommitValidator<T, I, K, O> {
+
+ NoOpValidator(HoodieSparkTable<T> table, HoodieEngineContext context,
HoodieWriteConfig config) {
+ super(table, context, config);
+ }
+
+ @Override
+ protected void validateRecordsBeforeAndAfter(Dataset<Row> before,
Dataset<Row> after,
+ Set<String>
partitionsAffected) {
+ // no-op — validation always passes
+ }
+ }
+
+ /** Minimal concrete validator that throws a fixed exception from
validateRecordsBeforeAndAfter. */
+ private static class ThrowingValidator<T, I, K, O extends
HoodieData<WriteStatus>>
+ extends SparkPreCommitValidator<T, I, K, O> {
+
+ private final RuntimeException toThrow;
+
+ ThrowingValidator(HoodieSparkTable<T> table, HoodieEngineContext context,
+ HoodieWriteConfig config, RuntimeException toThrow) {
+ super(table, context, config);
+ this.toThrow = toThrow;
+ }
+
+ @Override
+ protected void validateRecordsBeforeAndAfter(Dataset<Row> before,
Dataset<Row> after,
+ Set<String>
partitionsAffected) {
+ throw toThrow;
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/ExecutorServiceBasedEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/ExecutorServiceBasedEngineContext.java
new file mode 100644
index 000000000000..c6ba0d736acf
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/ExecutorServiceBasedEngineContext.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.engine;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.data.HoodieListPairData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.function.FunctionWrapper;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableConsumer;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A general-purpose {@link HoodieEngineContext} that executes all parallel
operations on a
+ * dedicated classloader-aware {@link ExecutorService}, so classes resolved by
the application
+ * classloader remain visible to worker threads on Java 11+.
+ *
+ * <p>The pool is lazily created once per JVM (JLS §12.4.2) and shared across
all instances.
+ * Worker threads are daemon threads and carry the classloader of this class,
preventing
+ * {@code ClassNotFoundException} that occurs when the common {@link
ForkJoinPool} is used
+ * because its workers do not inherit the submitting thread's context
classloader.
+ */
+public class ExecutorServiceBasedEngineContext extends HoodieEngineContext {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ExecutorServiceBasedEngineContext.class);
+
+ // Lazy-initialized, daemon, fixed thread pool whose workers carry the
correct classloader.
+ // JLS §12.4.2 guarantees thread-safe initialization via class-loading locks.
+ private static class PoolHolder {
+ static final ExecutorService INSTANCE = createExecutorService();
+ }
+
+ private static ExecutorService createExecutorService() {
+ int parallelism = ForkJoinPool.commonPool().getParallelism();
+ ClassLoader cl = ExecutorServiceBasedEngineContext.class.getClassLoader();
+ ExecutorService executor = Executors.newFixedThreadPool(parallelism, r -> {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setContextClassLoader(cl);
+ t.setDaemon(true);
+ return t;
+ });
+ LOG.info("Created ExecutorServiceBasedEngineContext pool with {} threads",
parallelism);
+ return executor;
+ }
+
+ public ExecutorServiceBasedEngineContext(StorageConfiguration<?> conf) {
+ super(conf, new LocalTaskContextSupplier());
+ }
+
+ // ---- Core parallel helpers ----
+
+ /**
+ * Submits each element to the executor pool and collects results in input
order.
+ * RuntimeExceptions / Errors from {@code func} are re-thrown as-is after
unwrapping
+ * the {@link CompletionException} wrapper.
+ */
+ private <I, O> List<O> mapAsync(List<I> data, Function<I, O> func) {
+ List<CompletableFuture<O>> futures = data.stream()
+ .map(item -> CompletableFuture.supplyAsync(() -> func.apply(item),
PoolHolder.INSTANCE))
+ .collect(Collectors.toList());
+ try {
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).join();
+ } catch (CompletionException e) {
+ throw rethrowUnwrapped(e);
+ }
+ return
futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
+ }
+
+ private static RuntimeException rethrowUnwrapped(CompletionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ }
+ if (cause instanceof Error) {
+ throw (Error) cause;
+ }
+ throw e;
+ }
+
+ // ---- HoodieEngineContext implementations ----
+
+ @Override
+ public HoodieAccumulator newAccumulator() {
+ return HoodieAtomicLongAccumulator.create();
+ }
+
+ @Override
+ public <T> HoodieData<T> emptyHoodieData() {
+ return HoodieListData.eager(Collections.emptyList());
+ }
+
+ @Override
+ public <K, V> HoodiePairData<K, V> emptyHoodiePairData() {
+ return HoodieListPairData.eager(Collections.emptyList());
+ }
+
+ @Override
+ public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
+ return HoodieListData.eager(data);
+ }
+
+ @Override
+ public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int
parallelism) {
+ return mapAsync(data, FunctionWrapper.throwingMapWrapper(func));
+ }
+
+ @Override
+ public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data,
SerializablePairFunction<I, K, V> mapToPairFunc,
+ SerializableBiFunction<V,
V, V> reduceFunc, int parallelism) {
+ List<Pair<K, V>> pairs = mapAsync(data,
FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc));
+ return pairs.stream()
+ .collect(Collectors.groupingBy(Pair::getKey)).values().stream()
+ .map(list -> list.stream().map(Pair::getValue)
+
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public <I, K, V> Stream<ImmutablePair<K, V>>
mapPartitionsToPairAndReduceByKey(
+ Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
flatMapToPairFunc,
+ SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ try {
+ return CompletableFuture.supplyAsync(() ->
+
FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.iterator())
+ .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
+ .map(entry -> new ImmutablePair<>(entry.getKey(),
+ entry.getValue().stream().map(Pair::getValue)
+
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)))
+ .filter(Objects::nonNull),
+ PoolHolder.INSTANCE).join();
+ } catch (CompletionException e) {
+ throw rethrowUnwrapped(e);
+ }
+ }
+
+ @Override
+ public <I, K, V> List<V> reduceByKey(List<Pair<K, V>> data,
SerializableBiFunction<V, V, V> reduceFunc,
+ int parallelism) {
+ // Group by key (sequential), then reduce each group in parallel on the
executor.
+ Map<K, List<V>> grouped = data.stream()
+ .collect(Collectors.groupingBy(Pair::getKey,
+ Collectors.mapping(Pair::getValue, Collectors.toList())));
+ return mapAsync(new ArrayList<>(grouped.entrySet()),
+ entry -> entry.getValue().stream()
+
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))
+ .stream().filter(Objects::nonNull).collect(Collectors.toList());
+ }
+
+ @Override
+ public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I,
Stream<O>> func, int parallelism) {
+ return mapAsync(data, FunctionWrapper.throwingFlatMapWrapper(func))
+ .stream().flatMap(s -> s).collect(Collectors.toList());
+ }
+
+ @Override
+ public <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int
parallelism) {
+ List<CompletableFuture<Void>> futures = data.stream()
+ .map(item -> CompletableFuture.runAsync(
+ () ->
FunctionWrapper.throwingForeachWrapper(consumer).accept(item),
PoolHolder.INSTANCE))
+ .collect(Collectors.toList());
+ try {
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).join();
+ } catch (CompletionException e) {
+ throw rethrowUnwrapped(e);
+ }
+ }
+
+ @Override
+ public <I, K, V> Map<K, V> mapToPair(List<I> data,
SerializablePairFunction<I, K, V> func, Integer parallelism) {
+ return mapAsync(data, FunctionWrapper.throwingMapToPairWrapper(func))
+ .stream().collect(Collectors.toMap(Pair::getLeft, Pair::getRight,
(oldVal, newVal) -> newVal));
+ }
+
+ @Override
+ public void setProperty(EngineProperty key, String value) {
+ // no operation
+ }
+
+ @Override
+ public Option<String> getProperty(EngineProperty key) {
+ return Option.empty();
+ }
+
+ @Override
+ public void setJobStatus(String activeModule, String activityDescription) {
+ // no operation
+ }
+
+ @Override
+ public void clearJobStatus() {
+ // no operation
+ }
+
+ @Override
+ public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
+ // no operation
+ }
+
+ @Override
+ public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void cancelJob(String jobId) {
+ // no operation
+ }
+
+ @Override
+ public void cancelAllJobs() {
+ // no operation
+ }
+
+ @Override
+ public <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp,
+ Functions.Function2<O, O, O> combOp) {
+ return data.collectAsList().stream().reduce(zeroValue, seqOp::apply,
combOp::apply);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> ReaderContextFactory<T>
getReaderContextFactory(HoodieTableMetaClient metaClient) {
+ return (ReaderContextFactory<T>) getEngineReaderContextFactory(metaClient);
+ }
+
+ @Override
+ public ReaderContextFactory<?>
getEngineReaderContextFactory(HoodieTableMetaClient metaClient) {
+ return new AvroReaderContextFactory(metaClient, new TypedProperties());
+ }
+
+ @Override
+ public KeyGenerator createKeyGenerator(TypedProperties props) throws
IOException {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestExecutorServiceBasedEngineContext.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestExecutorServiceBasedEngineContext.java
new file mode 100644
index 000000000000..475aa4c9d2dc
--- /dev/null
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestExecutorServiceBasedEngineContext.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.engine;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorage;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestExecutorServiceBasedEngineContext {
+
+ private ExecutorServiceBasedEngineContext context;
+
+ @BeforeEach
+ void setUp() {
+ HoodieStorage storage = getDefaultStorage();
+ context = new ExecutorServiceBasedEngineContext(storage.getConf());
+ }
+
+ @Test
+ void testMapHappyPath() {
+ List<Integer> result = context.map(Arrays.asList(1, 2, 3), x -> x * 2, 3);
+ assertEquals(Arrays.asList(2, 4, 6),
result.stream().sorted().collect(Collectors.toList()));
+ }
+
+ @Test
+ void testMapEmptyList() {
+ List<Integer> result =
context.map(java.util.Collections.<Integer>emptyList(), x -> x * 2, 0);
+ assertTrue(result.isEmpty(), "map over empty list must return empty list");
+ }
+
+ @Test
+ void testMapPreservesInputOrder() {
+ List<Integer> input = IntStream.range(0,
100).boxed().collect(Collectors.toList());
+ List<Integer> result = context.map(input, x -> x, 100);
+ assertEquals(input, result, "map must return results in the same order as
the input list");
+ }
+
+ @Test
+ void testMapPropagatesRuntimeException() {
+ RuntimeException original = new RuntimeException("boom");
+ RuntimeException thrown = assertThrows(RuntimeException.class, () ->
+ context.map(java.util.Collections.singletonList(1), x -> {
+ throw original;
+ }, 1));
+ // throwingMapWrapper wraps ALL exceptions in HoodieException; original is
the direct cause
+ assertInstanceOf(HoodieException.class, thrown);
+ assertSame(original, thrown.getCause());
+ }
+
+ @Test
+ void testMapPropagatesHoodieException() {
+ HoodieException original = new HoodieException("hoodie-boom");
+ RuntimeException thrown = assertThrows(RuntimeException.class, () ->
+ context.map(java.util.Collections.singletonList(1), x -> {
+ throw original;
+ }, 1));
+ assertInstanceOf(HoodieException.class, thrown);
+ assertSame(original, thrown.getCause());
+ }
+
+ @Test
+ void testMapWrapsCheckedException() {
+ Exception checkedCause = new Exception("checked!");
+ RuntimeException thrown = assertThrows(RuntimeException.class, () ->
+ context.map(java.util.Collections.singletonList(1), x -> {
+ throw checkedCause;
+ }, 1));
+ assertInstanceOf(HoodieException.class, thrown);
+ assertSame(checkedCause, thrown.getCause());
+ }
+
+ @Test
+ void testWorkerThreadClassloader() {
+ ClassLoader[] captured = new ClassLoader[1];
+ context.map(java.util.Collections.singletonList(1), x -> {
+ captured[0] = Thread.currentThread().getContextClassLoader();
+ return x;
+ }, 1);
+ assertEquals(ExecutorServiceBasedEngineContext.class.getClassLoader(),
captured[0],
+ "Worker threads must use ExecutorServiceBasedEngineContext classloader
to avoid ClassNotFoundException on Java 11+");
+ }
+}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestHoodieLocalEngineContext.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestHoodieLocalEngineContext.java
index bdb858c763e4..c4fc1e285f7c 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestHoodieLocalEngineContext.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestHoodieLocalEngineContext.java
@@ -150,24 +150,24 @@ public class TestHoodieLocalEngineContext {
ImmutablePair.of("key1", 42),
ImmutablePair.of("key2", 17)
);
-
+
HoodiePairData<String, Integer> pairData =
HoodieListPairData.lazy(singleValuePairs);
-
+
// Create a function that just returns the values
SerializableFunction<Iterator<Integer>, Iterator<Integer>> func = iterator
-> {
List<Integer> values = new ArrayList<>();
iterator.forEachRemaining(values::add);
return values.iterator();
};
-
+
List<String> shardIndices = Arrays.asList("key1", "key2");
HoodieData<Integer> result = context.mapGroupsByKey(pairData, func,
shardIndices, false);
-
+
List<Integer> resultList = result.collectAsList();
-
+
// Verify the results
assertEquals(2, resultList.size());
assertTrue(resultList.contains(42));
assertTrue(resultList.contains(17));
}
-}
\ No newline at end of file
+}