[ https://issues.apache.org/jira/browse/MAPREDUCE-7370?focusedWorklogId=790611&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-790611 ]
ASF GitHub Bot logged work on MAPREDUCE-7370: --------------------------------------------- Author: ASF GitHub Bot Created on: 13/Jul/22 22:16 Start Date: 13/Jul/22 22:16 Worklog Time Spent: 10m Work Description: cnauroth commented on code in PR #4248: URL: https://github.com/apache/hadoop/pull/4248#discussion_r920548705 ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + + List<Callable<Object>> callableList = new ArrayList<>(); + for (RecordWriter writer : recordWriters.values()) { - writer.close(null); + callableList.add(() -> { + try { + writer.close(null); + throw new IOException(); + } catch (IOException e) { + ioException.set(e); + } + return null; + }); + } + try { + executorService.invokeAll(callableList); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + executorService.shutdown(); Review Comment: `shutdown` does not wait for the submitted tasks to finish, so when the `close()` method returns, it won't really be guaranteed that closing has completed. We'll need a call to `awaitTermination` to make sure all tasks have finished running. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + + List<Callable<Object>> callableList = new ArrayList<>(); + for (RecordWriter writer : recordWriters.values()) { - writer.close(null); + callableList.add(() -> { + try { + writer.close(null); + throw new IOException(); Review Comment: Is this line left over from some manual testing? ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; Review Comment: I suggest making this configurable, with 10 as the default. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + + List<Callable<Object>> callableList = new ArrayList<>(); Review Comment: We know that we will generate exactly one callable for each `RecordWriter`. We can create the `ArrayList` pre-allocated to exactly the correct size, potentially avoiding reallocation inefficiencies: `new ArrayList<>(recordWriters.size())` ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + + List<Callable<Object>> callableList = new ArrayList<>(); + for (RecordWriter writer : recordWriters.values()) { - writer.close(null); + callableList.add(() -> { + try { + writer.close(null); + throw new IOException(); + } catch (IOException e) { + ioException.set(e); + } + return null; + }); + } + try { + executorService.invokeAll(callableList); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); Review Comment: You can log a warning here that closing was interrupted. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); Review Comment: I recommend using the version of this method that accepts a `ThreadFactory`, and probably use `ThreadFactoryBuilder`. The factory should generate threads that 1) use a naming format that makes it clear these threads are related to the closing process (e.g. "MultipleOutputs-close"), and 2) set an `UncaughtExceptionHandler` that logs the exception, which would make visible unexpected errors like unchecked exceptions. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java: ########## @@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws IOException { * could not be closed properly. */ public void close() throws IOException { + int nThreads = 10; + AtomicReference<IOException> ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + + List<Callable<Object>> callableList = new ArrayList<>(); + for (RecordWriter writer : recordWriters.values()) { - writer.close(null); + callableList.add(() -> { + try { + writer.close(null); + throw new IOException(); + } catch (IOException e) { + ioException.set(e); + } + return null; + }); + } + try { + executorService.invokeAll(callableList); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + executorService.shutdown(); + } + + if (ioException.get() != null) { + throw new IOException(ioException.get()); Review Comment: With this approach, if multiple record writers throw an exception during close, we'll only get visibility into one of them. I'd like to suggest a slightly different approach. Within the callable, catch the exception, log it immediately and flag an `AtomicBoolean`. Then, on this line, if that `AtomicBoolean` is set, throw an `IOException` from the overall method, with a message like "One or more threads encountered IOException during close. See prior errors." Issue Time Tracking ------------------- Worklog Id: (was: 790611) Time Spent: 2h 10m (was: 2h) > Parallelize MultipleOutputs#close call > -------------------------------------- > > Key: MAPREDUCE-7370 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7370 > Project: Hadoop Map/Reduce > Issue Type: Improvement > Affects Versions: 3.3.0 > Reporter: Prabhu Joseph > Assignee: Ashutosh Gupta > Priority: Major > Labels: pull-request-available > Time Spent: 2h 10m > Remaining Estimate: 0h > > This call takes more time when there are lot of files to close and there is a > high latency to close. Parallelize MultipleOutputs#close call to improve the > speed. > {code} > public void close() throws IOException { > for (RecordWriter writer : recordWriters.values()) { > writer.close(null); > } > } > {code} > Idea is from [~ste...@apache.org] -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org