[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585030#comment-17585030
 ] 

ASF GitHub Bot commented on MAPREDUCE-7370:
-------------------------------------------

cnauroth commented on code in PR #4248:
URL: https://github.com/apache/hadoop/pull/4248#discussion_r955436533


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java:
##########
@@ -70,6 +76,20 @@ public void testWithCounters() throws Exception {
     _testMOWithJavaSerialization(true);
   }
 
+  @Test(expected=IOException.class)
+  public void testParallelClose() throws IOException, InterruptedException {

Review Comment:
   I suggest naming this `testParallelCloseIOException` to make it clear that 
we are testing an error case.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -527,9 +558,41 @@ public void collect(Object key, Object value) throws 
IOException {
    * @throws java.io.IOException thrown if any of the MultipleOutput files
    *                             could not be closed properly.
    */
-  public void close() throws IOException {
+  public void close() throws IOException, InterruptedException {
+    int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
+        MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
+    AtomicBoolean encounteredException = new AtomicBoolean(false);
+    ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
+        .setUncaughtExceptionHandler(

Review Comment:
   `IOException` is now being propagated back to the caller of `close()`, but 
any unexpected (unchecked) exceptions would be swallowed in this uncaught 
exception handler. This is different from the existing code, where the caller 
would receive the unchecked exception.
   
   I think the best we can do here is to set `encounteredException` from within 
the uncaught exception handler, resulting in throwing the `IOException` at the 
bottom of the method. The message would need to be generalized to "encountered 
exception during close", not mentioning `IOException`, because it might also 
have been some other unchecked exception.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+          throw new IOException();
+        } catch (IOException e) {
+          ioException.set(e);
+        }
+        return null;
+      });
+    }
+    try {
+      executorService.invokeAll(callableList);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      executorService.shutdown();

Review Comment:
   Sorry, I think I gave some bad advice here. I see now that you're using 
`invokeAll`, and that method only returns after all invocations complete. 
Therefore, we know the work is all done, and we can proceed to `shutdown`.
   
   Calling `awaitTermination` opens up a new problem: how to decide on the 
timeout, here arbitrarily chosen as 50 seconds. Since we don't need really need 
`awaitTermination`, we might as well remove it and avoid that problem.





> Parallelize MultipleOutputs#close call
> --------------------------------------
>
>                 Key: MAPREDUCE-7370
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7370
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>    Affects Versions: 3.3.0
>            Reporter: Prabhu Joseph
>            Assignee: groot
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> This call takes more time when there are lot of files to close and there is a 
> high latency to close. Parallelize MultipleOutputs#close call to improve the 
> speed.
> {code}
>   public void close() throws IOException {
>     for (RecordWriter writer : recordWriters.values()) {
>       writer.close(null);
>     }
>   }
> {code}
> Idea is from [~ste...@apache.org]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to