cnauroth commented on code in PR #4248:
URL: https://github.com/apache/hadoop/pull/4248#discussion_r920548705


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+          throw new IOException();
+        } catch (IOException e) {
+          ioException.set(e);
+        }
+        return null;
+      });
+    }
+    try {
+      executorService.invokeAll(callableList);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      executorService.shutdown();

Review Comment:
   `shutdown` does not wait for the submitted tasks to finish, so when the 
`close()` method returns, it won't really be guaranteed that closing has 
completed. We'll need a call to `awaitTermination` to make sure all tasks have 
finished running.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+          throw new IOException();

Review Comment:
   Is this line left over from some manual testing?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;

Review Comment:
   I suggest making this configurable, with 10 as the default.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();

Review Comment:
   We know that we will generate exactly one callable for each `RecordWriter`. 
We can create the `ArrayList` pre-allocated to exactly the correct size, 
potentially avoiding reallocation inefficiencies: `new 
ArrayList<>(recordWriters.size())`



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+          throw new IOException();
+        } catch (IOException e) {
+          ioException.set(e);
+        }
+        return null;
+      });
+    }
+    try {
+      executorService.invokeAll(callableList);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();

Review Comment:
   You can log a warning here that closing was interrupted.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);

Review Comment:
   I recommend using the version of this method that accepts a `ThreadFactory`, 
and probably use `ThreadFactoryBuilder`. The factory should generate threads 
that 1) use a naming format that makes it clear these threads are related to 
the closing process (e.g. "MultipleOutputs-close"), and 2) set an 
`UncaughtExceptionHandler` that logs the exception, which would make visible 
unexpected errors like unchecked exceptions.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws 
IOException {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = 10;
+    AtomicReference<IOException> ioException = new AtomicReference<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    List<Callable<Object>> callableList = new ArrayList<>();
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+          throw new IOException();
+        } catch (IOException e) {
+          ioException.set(e);
+        }
+        return null;
+      });
+    }
+    try {
+      executorService.invokeAll(callableList);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      executorService.shutdown();
+    }
+
+    if (ioException.get() != null) {
+      throw new IOException(ioException.get());

Review Comment:
   With this approach, if multiple record writers throw an exception during 
close, we'll only get visibility into one of them. I'd like to suggest a 
slightly different approach. Within the callable, catch the exception, log it 
immediately and flag an `AtomicBoolean`. Then, on this line, if that 
`AtomicBoolean` is set, throw an `IOException` from the overall method, with a 
message like "One or more threads encountered IOException during close. See 
prior errors."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to