cnauroth commented on code in PR #4248: URL: https://github.com/apache/hadoop/pull/4248#discussion_r920548705
########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + + List<Callable<Object>> callableList = new ArrayList<>(); + for (RecordWriter writer : recordWriters.values()) { - writer.close(null); + callableList.add(() -> { + try { + writer.close(null); + throw new IOException(); + } catch (IOException e) { + ioException.set(e); + } + return null; + }); + } + try { + executorService.invokeAll(callableList); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + executorService.shutdown(); Review Comment: `shutdown` does not wait for the submitted tasks to finish, so when the `close()` method returns, it won't really be guaranteed that closing has completed. We'll need a call to `awaitTermination` to make sure all tasks have finished running. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + + List<Callable<Object>> callableList = new ArrayList<>(); + for (RecordWriter writer : recordWriters.values()) { - writer.close(null); + callableList.add(() -> { + try { + writer.close(null); + throw new IOException(); Review Comment: Is this line left over from some manual testing? ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; Review Comment: I suggest making this configurable, with 10 as the default. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + + List<Callable<Object>> callableList = new ArrayList<>(); Review Comment: We know that we will generate exactly one callable for each `RecordWriter`. We can create the `ArrayList` pre-allocated to exactly the correct size, potentially avoiding reallocation inefficiencies: `new ArrayList<>(recordWriters.size())` ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + + List<Callable<Object>> callableList = new ArrayList<>(); + for (RecordWriter writer : recordWriters.values()) { - writer.close(null); + callableList.add(() -> { + try { + writer.close(null); + throw new IOException(); + } catch (IOException e) { + ioException.set(e); + } + return null; + }); + } + try { + executorService.invokeAll(callableList); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); Review Comment: You can log a warning here that closing was interrupted. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); Review Comment: I recommend using the version of this method that accepts a `ThreadFactory`, and probably use `ThreadFactoryBuilder`. The factory should generate threads that 1) use a naming format that makes it clear these threads are related to the closing process (e.g. "MultipleOutputs-close"), and 2) set an `UncaughtExceptionHandler` that logs the exception, which would make visible unexpected errors like unchecked exceptions. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + + List<Callable<Object>> callableList = new ArrayList<>(); + for (RecordWriter writer : recordWriters.values()) { - writer.close(null); + callableList.add(() -> { + try { + writer.close(null); + throw new IOException(); + } catch (IOException e) { + ioException.set(e); + } + return null; + }); + } + try { + executorService.invokeAll(callableList); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + executorService.shutdown(); + } + + if (ioException.get() != null) { + throw new IOException(ioException.get()); Review Comment: With this approach, if multiple record writers throw an exception during close, we'll only get visibility into one of them. I'd like to suggest a slightly different approach. Within the callable, catch the exception, log it immediately and flag an `AtomicBoolean`. Then, on this line, if that `AtomicBoolean` is set, throw an `IOException` from the overall method, with a message like "One or more threads encountered IOException during close. See prior errors." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org