This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 86c29d74667a439398d8a7b015b1ff1d97abe138
Author: Asish Kumar <[email protected]>
AuthorDate: Fri May 15 06:50:35 2026 +0530

    fix: clear Hive work map after combine split failures (#18719)
    
    * fix: clear Hive work map after combine split failures
    
    HoodieCombineHiveInputFormat cleared Hive's work map only after successful 
split generation, leaving ThreadLocal work state behind when split 
classification or generation failed.
    
    Move the cleanup into a finally block and add a regression test that forces 
getSplits to fail before verifying Utilities.clearWorkMapForConf is still 
invoked.
---
 .../hadoop/hive/HoodieCombineHiveInputFormat.java  | 174 +++++++++++----------
 .../hive/TestHoodieCombineHiveInputFormat.java     |  32 ++++
 2 files changed, 120 insertions(+), 86 deletions(-)

diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index a71695be427e..9634b7f6b097 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -324,111 +324,113 @@ public class HoodieCombineHiveInputFormat<K extends 
WritableComparable, V extend
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
-    init(job);
 
-    List<InputSplit> result = new ArrayList<>();
+    try {
+      init(job);
+      List<InputSplit> result = new ArrayList<>();
 
-    Path[] paths = getInputPaths(job);
+      Path[] paths = getInputPaths(job);
 
-    List<Path> nonCombinablePaths = new ArrayList<>(paths.length / 2);
-    List<Path> combinablePaths = new ArrayList<>(paths.length / 2);
+      List<Path> nonCombinablePaths = new ArrayList<>(paths.length / 2);
+      List<Path> combinablePaths = new ArrayList<>(paths.length / 2);
 
-    int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM,
-        (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD));
+      int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM,
+          (int) Math.ceil((double) paths.length / 
DEFAULT_NUM_PATH_PER_THREAD));
 
-    // This check is necessary because for Spark branch, the result array from
-    // getInputPaths() above could be empty, and therefore numThreads could be 
0.
-    // In that case, Executors.newFixedThreadPool will fail.
-    if (numThreads > 0) {
-      try {
-        Set<Integer> nonCombinablePathIndices = 
getNonCombinablePathIndices(job, paths, numThreads);
-        for (int i = 0; i < paths.length; i++) {
-          if (nonCombinablePathIndices.contains(i)) {
-            nonCombinablePaths.add(paths[i]);
-          } else {
-            combinablePaths.add(paths[i]);
+      // This check is necessary because for Spark branch, the result array 
from
+      // getInputPaths() above could be empty, and therefore numThreads could 
be 0.
+      // In that case, Executors.newFixedThreadPool will fail.
+      if (numThreads > 0) {
+        try {
+          Set<Integer> nonCombinablePathIndices = 
getNonCombinablePathIndices(job, paths, numThreads);
+          for (int i = 0; i < paths.length; i++) {
+            if (nonCombinablePathIndices.contains(i)) {
+              nonCombinablePaths.add(paths[i]);
+            } else {
+              combinablePaths.add(paths[i]);
+            }
           }
+        } catch (Exception e) {
+          LOG.error("Error checking non-combinable path", e);
+          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
+          throw new IOException(e);
         }
-      } catch (Exception e) {
-        LOG.error("Error checking non-combinable path", e);
-        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
-        throw new IOException(e);
       }
-    }
 
-    // Store the previous value for the path specification
-    String oldPaths = 
job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
-    LOG.debug("The received input paths are: [{}] against the property {}", 
oldPaths,
-        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
-
-    // Process the normal splits
-    if (nonCombinablePaths.size() > 0) {
-      FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new 
Path[0]));
-      InputSplit[] splits = super.getSplits(job, numSplits);
-      Collections.addAll(result, splits);
-    }
+      // Store the previous value for the path specification
+      String oldPaths = 
job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
+      LOG.debug("The received input paths are: [{}] against the property {}", 
oldPaths,
+          org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
 
-    // Process the combine splits
-    if (combinablePaths.size() > 0) {
-      FileInputFormat.setInputPaths(job, combinablePaths.toArray(new Path[0]));
-      Map<Path, PartitionDesc> pathToPartitionInfo = this.pathToPartitionInfo 
!= null ? this.pathToPartitionInfo
-          : Utilities.getMapWork(job).getPathToPartitionInfo();
-      InputSplit[] splits = getCombineSplits(job, numSplits, 
pathToPartitionInfo);
-      Collections.addAll(result, splits);
-    }
+      // Process the normal splits
+      if (nonCombinablePaths.size() > 0) {
+        FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new 
Path[0]));
+        InputSplit[] splits = super.getSplits(job, numSplits);
+        Collections.addAll(result, splits);
+      }
 
-    // Restore the old path information back
-    // This is just to prevent incompatibilities with previous versions Hive
-    // if some application depends on the original value being set.
-    if (oldPaths != null) {
-      job.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, 
oldPaths);
-    }
+      // Process the combine splits
+      if (combinablePaths.size() > 0) {
+        FileInputFormat.setInputPaths(job, combinablePaths.toArray(new 
Path[0]));
+        Map<Path, PartitionDesc> pathToPartitionInfo = 
this.pathToPartitionInfo != null ? this.pathToPartitionInfo
+            : Utilities.getMapWork(job).getPathToPartitionInfo();
+        InputSplit[] splits = getCombineSplits(job, numSplits, 
pathToPartitionInfo);
+        Collections.addAll(result, splits);
+      }
 
-    // clear work from ThreadLocal after splits generated in case of thread is 
reused in pool.
-    Utilities.clearWorkMapForConf(job);
+      // Restore the old path information back
+      // This is just to prevent incompatibilities with previous versions Hive
+      // if some application depends on the original value being set.
+      if (oldPaths != null) {
+        
job.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, 
oldPaths);
+      }
 
-    // build internal schema for the query
-    if (!result.isEmpty()) {
-      ArrayList<String> uniqTablePaths = new ArrayList<>();
-      Arrays.stream(paths).forEach(path -> {
-        final HoodieStorage storage;
-        try {
-          FileSystem fs = path.getFileSystem(job);
-          storage = HoodieStorageUtils.getStorage(
-              HadoopFSUtils.convertToStoragePath(path), 
HadoopFSUtils.getStorageConf(fs.getConf()));
-          Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, 
HadoopFSUtils.convertToStoragePath(path));
-          if (tablePath.isPresent()) {
-            uniqTablePaths.add(tablePath.get().toUri().toString());
+      // build internal schema for the query
+      if (!result.isEmpty()) {
+        ArrayList<String> uniqTablePaths = new ArrayList<>();
+        Arrays.stream(paths).forEach(path -> {
+          final HoodieStorage storage;
+          try {
+            FileSystem fs = path.getFileSystem(job);
+            storage = HoodieStorageUtils.getStorage(
+                HadoopFSUtils.convertToStoragePath(path), 
HadoopFSUtils.getStorageConf(fs.getConf()));
+            Option<StoragePath> tablePath = 
TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path));
+            if (tablePath.isPresent()) {
+              uniqTablePaths.add(tablePath.get().toUri().toString());
+            }
+          } catch (IOException e) {
+            throw new RuntimeException(e);
           }
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      });
+        });
 
-      try {
-        for (String path : uniqTablePaths) {
-          HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(path).setConf(new 
HadoopStorageConfiguration(job)).build();
-          TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
-          String avroSchema = schemaUtil.getTableSchema().toString();
-          Option<InternalSchema> internalSchema = 
schemaUtil.getTableInternalSchemaFromCommitMetadata();
-          if (internalSchema.isPresent()) {
-            LOG.info("Set internal and avro schema cache with path: {}", path);
-            job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema);
-            job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, 
SerDeHelper.toJson(internalSchema.get()));
-          } else {
-            // always sets up the cache so that we can distinguish with the 
scenario where the cache was never set(e.g. in tests).
-            job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
-            job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
+        try {
+          for (String path : uniqTablePaths) {
+            HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(path).setConf(new 
HadoopStorageConfiguration(job)).build();
+            TableSchemaResolver schemaUtil = new 
TableSchemaResolver(metaClient);
+            String avroSchema = schemaUtil.getTableSchema().toString();
+            Option<InternalSchema> internalSchema = 
schemaUtil.getTableInternalSchemaFromCommitMetadata();
+            if (internalSchema.isPresent()) {
+              LOG.info("Set internal and avro schema cache with path: {}", 
path);
+              job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema);
+              job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, 
SerDeHelper.toJson(internalSchema.get()));
+            } else {
+              // always sets up the cache so that we can distinguish with the 
scenario where the cache was never set(e.g. in tests).
+              job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
+              job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
+            }
           }
+        } catch (Exception e) {
+          LOG.warn("Failed to set schema cache", e);
         }
-      } catch (Exception e) {
-        LOG.warn("Failed to set schema cache", e);
       }
-    }
 
-    LOG.info("Number of all splits {}", result.size());
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
-    return result.toArray(new InputSplit[result.size()]);
+      LOG.info("Number of all splits {}", result.size());
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
+      return result.toArray(new InputSplit[result.size()]);
+    } finally {
+      // Clear work from ThreadLocal after each getSplits attempt, in case the 
thread is reused in a pool.
+      Utilities.clearWorkMapForConf(job);
+    }
   }
 
   private void processPaths(JobConf job, CombineFileInputFormatShim combine, 
List<CombineFileSplit> iss, Path... path)
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
index bf331eb55c84..44ceac534004 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
@@ -67,6 +67,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
@@ -80,7 +82,13 @@ import static 
org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK;
 import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_MAPPER_CLASS;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
 
 public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
 
@@ -119,6 +127,30 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
     }
   }
 
+  @Test
+  public void testClearWorkMapForConfOnGetSplitsFailure() throws Exception {
+    StorageConfiguration<Configuration> conf = 
HoodieTestUtils.getDefaultStorageConf();
+    File inputDir = tempDir.resolve("input").toFile();
+    assertTrue(inputDir.mkdirs());
+
+    MapredWork mrwork = new MapredWork();
+    Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
+    Utilities.setMapRedWork(conf.unwrap(), mrwork, mapWorkPath);
+    JobConf jobConf = new JobConf(conf.unwrap());
+    FileInputFormat.setInputPaths(jobConf, inputDir.getPath());
+    jobConf.set(HAS_MAP_WORK, "true");
+    jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
+
+    HoodieCombineHiveInputFormat combineHiveInputFormat = spy(new 
HoodieCombineHiveInputFormat());
+    doThrow(new RuntimeException("path classification 
failed")).when(combineHiveInputFormat)
+        .getNonCombinablePathIndices(eq(jobConf), any(Path[].class), anyInt());
+
+    try (MockedStatic<Utilities> utilities = 
Mockito.mockStatic(Utilities.class, Mockito.CALLS_REAL_METHODS)) {
+      assertThrows(IOException.class, () -> 
combineHiveInputFormat.getSplits(jobConf, 1));
+      utilities.verify(() -> Utilities.clearWorkMapForConf(jobConf));
+    }
+  }
+
   @Test
   public void testInternalSchemaCacheForMR() throws Exception {
     // test for HUDI-8182

Reply via email to