[ 
https://issues.apache.org/jira/browse/HDFS-9494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15058325#comment-15058325
 ] 

Rakesh R commented on HDFS-9494:
--------------------------------

Thanks [~demongaorui]. Overall the latest patch looks good apart from one 
comment. I'm adding a thought about the new exception handling logic, sorry for 
not identifying this point in my previous review.

With the proposed changes, it looks like not catching per streamer failure and 
handling that streamer. Instead it is throwing InterruptedIOException OR just 
warning. Is that intentionally implemented?
{code}
+      try {
+        executorCompletionService.take().get();
+      } catch (InterruptedException ie) {
+        throw DFSUtilClient.toInterruptedIOException(
+            "Interrupted during waiting all streamer flush, ", ie);
+      } catch (ExecutionException ee) {
+        LOG.warn(
+            "Caught ExecutionException while waiting all streamer flush, ", 
ee);
+      }
{code}
Existing logic:

1# Iterate each streamer.
2# If there is a failure in the current streamer then 
{{handleStreamerFailure("flushInternal " + s, e);}}

New logic:

1# iterate each streamer
2# flush it and not waiting for the ack
3# Wait for any streamer completion. If one streamer failed then throwing 
IOException back to the caller OR just warn then continue.

Here in 3# step, I think it would be good if we can handle each streamer 
exception separately. For this, one draft idea that comes in mind is to move 
the exception handling logic inside {{Callable#call()}} rather than throwing 
the IOException of this function {{s.waitForAckedSeqno(toWaitFor);}}. Also, you 
need to refactor {{handleStreamerFailure()}} function by passing extra 
parameter of {{StripedDataStreamer s}} like,

{code}
            public Void call() throws Exception {
               try {
                  s.waitForAckedSeqno(toWaitFor);
               catch(IOException ioe){
                   handleStreamerFailure(s, "flushInternal ", ioe);
               }
               return null;
            }
{code}

> Parallel optimization of DFSStripedOutputStream#flushAllInternals( )
> --------------------------------------------------------------------
>
>                 Key: HDFS-9494
>                 URL: https://issues.apache.org/jira/browse/HDFS-9494
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>            Reporter: GAO Rui
>            Assignee: GAO Rui
>            Priority: Minor
>         Attachments: HDFS-9494-origin-trunk.00.patch, 
> HDFS-9494-origin-trunk.01.patch, HDFS-9494-origin-trunk.02.patch
>
>
> Currently, in DFSStripedOutputStream#flushAllInternals( ), we trigger and 
> wait for flushInternal( ) in sequence. So the runtime flow is like:
> {code}
> Streamer0#flushInternal( )
> Streamer0#waitForAckedSeqno( )
> Streamer1#flushInternal( )
> Streamer1#waitForAckedSeqno( )
> …
> Streamer8#flushInternal( )
> Streamer8#waitForAckedSeqno( )
> {code}
> It could be better to trigger all the streamers to flushInternal( ) and
> wait for all of them to return from waitForAckedSeqno( ),  and then 
> flushAllInternals( ) returns.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to