[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7370?focusedWorklogId=790611&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-790611
 ]

ASF GitHub Bot logged work on MAPREDUCE-7370:
---------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Jul/22 22:16
            Start Date: 13/Jul/22 22:16
    Worklog Time Spent: 10m 
      Work Description: cnauroth commented on code in PR #4248:
URL: https://github.com/apache/hadoop/pull/4248#discussion_r920548705


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+          throw new IOException();
+        } catch (IOException e) {
+          ioException.set(e);
+        }
+        return null;
+      });
+    }
+    try {
+      executorService.invokeAll(callableList);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      executorService.shutdown();

Review Comment:
   `shutdown` does not wait for the submitted tasks to finish, so when the 
`close()` method returns, it won't really be guaranteed that closing has 
completed. We'll need a call to `awaitTermination` to make sure all tasks have 
finished running.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+          throw new IOException();

Review Comment:
   Is this line left over from some manual testing?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;

Review Comment:
   I suggest making this configurable, with 10 as the default.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();

Review Comment:
   We know that we will generate exactly one callable for each `RecordWriter`. 
We can create the `ArrayList` pre-allocated to exactly the correct size, 
potentially avoiding reallocation inefficiencies: `new 
ArrayList<>(recordWriters.size())`



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+          throw new IOException();
+        } catch (IOException e) {
+          ioException.set(e);
+        }
+        return null;
+      });
+    }
+    try {
+      executorService.invokeAll(callableList);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();

Review Comment:
   You can log a warning here that closing was interrupted.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);

Review Comment:
   I recommend using the version of this method that accepts a `ThreadFactory`, 
and probably use `ThreadFactoryBuilder`. The factory should generate threads 
that 1) use a naming format that makes it clear these threads are related to 
the closing process (e.g. "MultipleOutputs-close"), and 2) set an 
`UncaughtExceptionHandler` that logs the exception, which would make visible 
unexpected errors like unchecked exceptions.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+          throw new IOException();
+        } catch (IOException e) {
+          ioException.set(e);
+        }
+        return null;
+      });
+    }
+    try {
+      executorService.invokeAll(callableList);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      executorService.shutdown();
+    }
+
+    if (ioException.get() != null) {
+      throw new IOException(ioException.get());

Review Comment:
   With this approach, if multiple record writers throw an exception during 
close, we'll only get visibility into one of them. I'd like to suggest a 
slightly different approach. Within the callable, catch the exception, log it 
immediately and flag an `AtomicBoolean`. Then, on this line, if that 
`AtomicBoolean` is set, throw an `IOException` from the overall method, with a 
message like "One or more threads encountered IOException during close. See 
prior errors."





Issue Time Tracking
-------------------

    Worklog Id:     (was: 790611)
    Time Spent: 2h 10m  (was: 2h)

> Parallelize MultipleOutputs#close call
> --------------------------------------
>
>                 Key: MAPREDUCE-7370
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7370
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>    Affects Versions: 3.3.0
>            Reporter: Prabhu Joseph
>            Assignee: Ashutosh Gupta
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> This call takes more time when there are lot of files to close and there is a 
> high latency to close. Parallelize MultipleOutputs#close call to improve the 
> speed.
> {code}
>   public void close() throws IOException {
>     for (RecordWriter writer : recordWriters.values()) {
>       writer.close(null);
>     }
>   }
> {code}
> Idea is from [~ste...@apache.org]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to