hudi-agent commented on code in PR #18719:
URL: https://github.com/apache/hudi/pull/18719#discussion_r3237123833
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java:
##########
@@ -324,111 +324,113 @@ public Set<Integer> getNonCombinablePathIndices(JobConf
job, Path[] paths, int n
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
- init(job);
- List<InputSplit> result = new ArrayList<>();
+ try {
+ init(job);
+ List<InputSplit> result = new ArrayList<>();
- Path[] paths = getInputPaths(job);
+ Path[] paths = getInputPaths(job);
- List<Path> nonCombinablePaths = new ArrayList<>(paths.length / 2);
- List<Path> combinablePaths = new ArrayList<>(paths.length / 2);
+ List<Path> nonCombinablePaths = new ArrayList<>(paths.length / 2);
+ List<Path> combinablePaths = new ArrayList<>(paths.length / 2);
- int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM,
- (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD));
+ int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM,
+ (int) Math.ceil((double) paths.length /
DEFAULT_NUM_PATH_PER_THREAD));
- // This check is necessary because for Spark branch, the result array from
- // getInputPaths() above could be empty, and therefore numThreads could be
0.
- // In that case, Executors.newFixedThreadPool will fail.
- if (numThreads > 0) {
- try {
- Set<Integer> nonCombinablePathIndices =
getNonCombinablePathIndices(job, paths, numThreads);
- for (int i = 0; i < paths.length; i++) {
- if (nonCombinablePathIndices.contains(i)) {
- nonCombinablePaths.add(paths[i]);
- } else {
- combinablePaths.add(paths[i]);
+ // This check is necessary because for Spark branch, the result array
from
+ // getInputPaths() above could be empty, and therefore numThreads could
be 0.
+ // In that case, Executors.newFixedThreadPool will fail.
+ if (numThreads > 0) {
+ try {
+ Set<Integer> nonCombinablePathIndices =
getNonCombinablePathIndices(job, paths, numThreads);
+ for (int i = 0; i < paths.length; i++) {
+ if (nonCombinablePathIndices.contains(i)) {
+ nonCombinablePaths.add(paths[i]);
+ } else {
+ combinablePaths.add(paths[i]);
+ }
}
+ } catch (Exception e) {
+ LOG.error("Error checking non-combinable path", e);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
+ throw new IOException(e);
}
- } catch (Exception e) {
- LOG.error("Error checking non-combinable path", e);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
- throw new IOException(e);
}
- }
- // Store the previous value for the path specification
- String oldPaths =
job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
- LOG.debug("The received input paths are: [{}] against the property {}",
oldPaths,
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
-
- // Process the normal splits
- if (nonCombinablePaths.size() > 0) {
- FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new
Path[0]));
- InputSplit[] splits = super.getSplits(job, numSplits);
- Collections.addAll(result, splits);
- }
+ // Store the previous value for the path specification
+ String oldPaths =
job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
+ LOG.debug("The received input paths are: [{}] against the property {}",
oldPaths,
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
- // Process the combine splits
- if (combinablePaths.size() > 0) {
- FileInputFormat.setInputPaths(job, combinablePaths.toArray(new Path[0]));
- Map<Path, PartitionDesc> pathToPartitionInfo = this.pathToPartitionInfo
!= null ? this.pathToPartitionInfo
- : Utilities.getMapWork(job).getPathToPartitionInfo();
- InputSplit[] splits = getCombineSplits(job, numSplits,
pathToPartitionInfo);
- Collections.addAll(result, splits);
- }
+ // Process the normal splits
+ if (nonCombinablePaths.size() > 0) {
+ FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new
Path[0]));
+ InputSplit[] splits = super.getSplits(job, numSplits);
+ Collections.addAll(result, splits);
+ }
- // Restore the old path information back
- // This is just to prevent incompatibilities with previous versions Hive
- // if some application depends on the original value being set.
- if (oldPaths != null) {
- job.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
oldPaths);
- }
+ // Process the combine splits
+ if (combinablePaths.size() > 0) {
+ FileInputFormat.setInputPaths(job, combinablePaths.toArray(new
Path[0]));
+ Map<Path, PartitionDesc> pathToPartitionInfo =
this.pathToPartitionInfo != null ? this.pathToPartitionInfo
+ : Utilities.getMapWork(job).getPathToPartitionInfo();
+ InputSplit[] splits = getCombineSplits(job, numSplits,
pathToPartitionInfo);
+ Collections.addAll(result, splits);
+ }
- // clear work from ThreadLocal after splits generated in case of thread is
reused in pool.
- Utilities.clearWorkMapForConf(job);
+ // Restore the old path information back
+ // This is just to prevent incompatibilities with previous versions Hive
+ // if some application depends on the original value being set.
+ if (oldPaths != null) {
+
job.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
oldPaths);
+ }
- // build internal schema for the query
- if (!result.isEmpty()) {
- ArrayList<String> uniqTablePaths = new ArrayList<>();
- Arrays.stream(paths).forEach(path -> {
- final HoodieStorage storage;
- try {
- FileSystem fs = path.getFileSystem(job);
- storage = HoodieStorageUtils.getStorage(
- HadoopFSUtils.convertToStoragePath(path),
HadoopFSUtils.getStorageConf(fs.getConf()));
- Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage,
HadoopFSUtils.convertToStoragePath(path));
- if (tablePath.isPresent()) {
- uniqTablePaths.add(tablePath.get().toUri().toString());
+ // build internal schema for the query
+ if (!result.isEmpty()) {
+ ArrayList<String> uniqTablePaths = new ArrayList<>();
+ Arrays.stream(paths).forEach(path -> {
+ final HoodieStorage storage;
+ try {
+ FileSystem fs = path.getFileSystem(job);
+ storage = HoodieStorageUtils.getStorage(
+ HadoopFSUtils.convertToStoragePath(path),
HadoopFSUtils.getStorageConf(fs.getConf()));
+ Option<StoragePath> tablePath =
TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path));
+ if (tablePath.isPresent()) {
+ uniqTablePaths.add(tablePath.get().toUri().toString());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ });
- try {
- for (String path : uniqTablePaths) {
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(path).setConf(new
HadoopStorageConfiguration(job)).build();
- TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
- String avroSchema = schemaUtil.getTableSchema().toString();
- Option<InternalSchema> internalSchema =
schemaUtil.getTableInternalSchemaFromCommitMetadata();
- if (internalSchema.isPresent()) {
- LOG.info("Set internal and avro schema cache with path: {}", path);
- job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema);
- job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path,
SerDeHelper.toJson(internalSchema.get()));
- } else {
- // always sets up the cache so that we can distinguish with the
scenario where the cache was never set(e.g. in tests).
- job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
- job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
+ try {
+ for (String path : uniqTablePaths) {
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(path).setConf(new
HadoopStorageConfiguration(job)).build();
+ TableSchemaResolver schemaUtil = new
TableSchemaResolver(metaClient);
+ String avroSchema = schemaUtil.getTableSchema().toString();
+ Option<InternalSchema> internalSchema =
schemaUtil.getTableInternalSchemaFromCommitMetadata();
+ if (internalSchema.isPresent()) {
+ LOG.info("Set internal and avro schema cache with path: {}",
path);
+ job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema);
+ job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path,
SerDeHelper.toJson(internalSchema.get()));
+ } else {
+ // always sets up the cache so that we can distinguish with the
scenario where the cache was never set(e.g. in tests).
+ job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
+ job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
+ }
}
+ } catch (Exception e) {
+ LOG.warn("Failed to set schema cache", e);
}
- } catch (Exception e) {
- LOG.warn("Failed to set schema cache", e);
}
- }
- LOG.info("Number of all splits {}", result.size());
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
- return result.toArray(new InputSplit[result.size()]);
+ LOG.info("Number of all splits {}", result.size());
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
+ return result.toArray(new InputSplit[result.size()]);
+ } finally {
+ // Clear work from ThreadLocal after splits are generated in case the
thread is reused in a pool.
+ Utilities.clearWorkMapForConf(job);
+ }
Review Comment:
🤖 nit: the comment says "after splits are generated" but since this is now
in a `finally` block, it runs even when generation fails — which is the whole
point of the fix. Could you update it to something like `// Clear work from
ThreadLocal after getSplits completes (successfully or not), in case the thread
is reused in a pool.`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]