SourabhBadhya commented on code in PR #5418:
URL: https://github.com/apache/hive/pull/5418#discussion_r1759132288
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -307,27 +307,24 @@ public void commitJobs(List<JobContext>
originalContextList, Operation operation
}
}
- private List<OutputTable> collectOutputs(List<JobContext> jobContextList) {
- return jobContextList.stream()
- .flatMap(jobContext ->
HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()).stream()
- .map(output -> new OutputTable(
- HiveIcebergStorageHandler.catalogName(jobContext.getJobConf(),
output),
- output,
- SessionStateUtil.getResource(jobContext.getJobConf(), output)
+ private Multimap<OutputTable, JobContext> collectOutputs(List<JobContext>
jobContextList) {
+ Multimap<OutputTable, JobContext> outputs =
Review Comment:
Why MultiMap? Till now we only considered one JobContext, will this generate
multiple JobContexts per OutputTable.
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -346,43 +343,42 @@ public void abortJobs(List<JobContext>
originalContextList) throws IOException {
List<JobContext> jobContextList = originalContextList.stream()
.map(TezUtil::enrichContextWithVertexId)
.collect(Collectors.toList());
- List<OutputTable> outputs = collectOutputs(jobContextList);
+ Multimap<OutputTable, JobContext> outputs = collectOutputs(jobContextList);
+ JobConf jobConf = jobContextList.get(0).getJobConf();
String ids = jobContextList.stream()
.map(jobContext ->
jobContext.getJobID().toString()).collect(Collectors.joining(","));
LOG.info("Job(s) {} are aborted. Data file cleaning started", ids);
- Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
+ ExecutorService fileExecutor = fileExecutor(jobConf);
+ ExecutorService tableExecutor = tableExecutor(jobConf,
outputs.keySet().size());
- ExecutorService fileExecutor =
fileExecutor(jobContextList.get(0).getJobConf());
- ExecutorService tableExecutor =
tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
+ Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
try {
// Cleans up the changes for the output tables in parallel
- Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
- .map(jobContext -> new SimpleImmutableEntry<>(kv.table,
jobContext))))
+ Tasks.foreach(outputs.keySet())
.suppressFailureWhenFinished()
.executeWith(tableExecutor)
.onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on
abort job", output, exc))
.run(output -> {
- JobContext jobContext = output.getValue();
- JobConf jobConf = jobContext.getJobConf();
- LOG.info("Cleaning job for jobID: {}, table: {}",
jobContext.getJobID(), output);
-
- Table table = output.getKey();
- String jobLocation = generateJobLocation(table.location(),
jobConf, jobContext.getJobID());
- jobLocations.add(jobLocation);
- // list jobLocation to get number of forCommit files
- // we do this because map/reduce num in jobConf is unreliable and
we have no access to vertex status info
- int numTasks = listForCommits(jobConf, jobLocation).size();
- FilesForCommit results = collectResults(numTasks, fileExecutor,
table.location(), jobContext,
- table.io(), false);
- // Check if we have files already written and remove data and
delta files if there are any
- Tasks.foreach(results.allFiles())
- .retry(3)
- .suppressFailureWhenFinished()
- .executeWith(fileExecutor)
- .onFailure((file, exc) -> LOG.warn("Failed to remove data file
{} on abort job", file.path(), exc))
- .run(file -> table.io().deleteFile(file.path().toString()));
+ for (JobContext jobContext : outputs.get(output)) {
Review Comment:
Before it was only one JobContext. But now its multiple jobContexts, why?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]