[ https://issues.apache.org/jira/browse/MAPREDUCE-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585030#comment-17585030 ]
ASF GitHub Bot commented on MAPREDUCE-7370: ------------------------------------------- cnauroth commented on code in PR #4248: URL: https://github.com/apache/hadoop/pull/4248#discussion_r955436533 ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java: ########## @@ -70,6 +76,20 @@ public void testWithCounters() throws Exception { _testMOWithJavaSerialization(true); } + @Test(expected=IOException.class) + public void testParallelClose() throws IOException, InterruptedException { Review Comment: I suggest naming this `testParallelCloseIOException` to make it clear that we are testing an error case. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -527,9 +558,41 @@ public void collect(Object key, Object value) throws IOException { * @throws java.io.IOException thrown if any of the MultipleOutput files * could not be closed properly. */ - public void close() throws IOException { + public void close() throws IOException, InterruptedException { + int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT, + MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT); + AtomicBoolean encounteredException = new AtomicBoolean(false); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close") + .setUncaughtExceptionHandler( Review Comment: `IOException` is now being propagated back to the caller of `close()`, but any unexpected (unchecked) exceptions would be swallowed in this uncaught exception handler. This is different from the existing code, where the caller would receive the unchecked exception. I think the best we can do here is to set `encounteredException` from within the uncaught exception handler, resulting in throwing the `IOException` at the bottom of the method. The message would need to be generalized to "encountered exception during close", not mentioning `IOException`, because it might also have been some other unchecked exception. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + + List<Callable<Object>> callableList = new ArrayList<>(); + for (RecordWriter writer : recordWriters.values()) { - writer.close(null); + callableList.add(() -> { + try { + writer.close(null); + throw new IOException(); + } catch (IOException e) { + ioException.set(e); + } + return null; + }); + } + try { + executorService.invokeAll(callableList); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + executorService.shutdown(); Review Comment: Sorry, I think I gave some bad advice here. I see now that you're using `invokeAll`, and that method only returns after all invocations complete. Therefore, we know the work is all done, and we can proceed to `shutdown`. Calling `awaitTermination` opens up a new problem: how to decide on the timeout, here arbitrarily chosen as 50 seconds. Since we don't need really need `awaitTermination`, we might as well remove it and avoid that problem. > Parallelize MultipleOutputs#close call > -------------------------------------- > > Key: MAPREDUCE-7370 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7370 > Project: Hadoop Map/Reduce > Issue Type: Improvement > Affects Versions: 3.3.0 > Reporter: Prabhu Joseph > Assignee: groot > Priority: Major > Labels: pull-request-available > Time Spent: 2h 20m > Remaining Estimate: 0h > > This call takes more time when there are lot of files to close and there is a > high latency to close. Parallelize MultipleOutputs#close call to improve the > speed. > {code} > public void close() throws IOException { > for (RecordWriter writer : recordWriters.values()) { > writer.close(null); > } > } > {code} > Idea is from [~ste...@apache.org] -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org