[ 
https://issues.apache.org/jira/browse/FLUME-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13508123#comment-13508123
 ] 

Hari Shreedharan commented on FLUME-1748:
-----------------------------------------

Adding some context:

Because all of the HDFS write operations happen from synchronized calls, they 
block on the object monitor to write to HDFS. Since Hadoop RPC does not respond 
to interrupts until the call actually completes, the timeouts from Flume are 
pretty much useless. So if one of the calls takes way longer than the timeout, 
the futures for other writes may be cancelled before they acquire the monitor 
itself - therefore these threads end up having their interrupt flag set. The IO 
operations usually check the interrupt status and fail the operation if the 
flag is set (usually throwing a ClosedByInterruptedException or something). 

The patch only tries to throw an exception which makes this clear.
                
> HDFS Sink should check if the thread is interrupted before performing any 
> HDFS operations
> -----------------------------------------------------------------------------------------
>
>                 Key: FLUME-1748
>                 URL: https://issues.apache.org/jira/browse/FLUME-1748
>             Project: Flume
>          Issue Type: Bug
>            Reporter: Hari Shreedharan
>            Assignee: Hari Shreedharan
>         Attachments: FLUME-1748.patch
>
>
> If one or more HDFS Sink writes/flush/close end up taking too long, the 
> thread is interrupted by the future.cancel(true) call. The Hadoop RPC client 
> re-sets the interrupt flag on a thread which is interrupted. Also if the 
> thread is interrupted after the RPC call, but before the call() method 
> returns, the interrupt flag stays on the thread. A future HDFS file open call 
> would lead to an exception of this sort:
> {code}
> [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
> java.io.IOException: Failed on local exception: 
> java.nio.channels.ClosedByInterruptException; Host Details : local host is: 
> "random.example.com/10.20.81.108"; destination host is: 
> "random2.example.com":8020; 
>       at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1164)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>       at $Proxy9.create(Unknown Source)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>       at java.lang.reflect.Method.invoke(Method.java:597)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>       at $Proxy9.create(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
>       at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
>       at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
>       at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
>       at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
>       at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
>       at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
>       at 
> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
>       at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
>       at 
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
>       at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
>       at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
>       at 
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
>       at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
>       at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
>       at 
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
>       at 
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
>       at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>       at java.lang.Thread.run(Thread.java:662)
> Caused by: java.nio.channels.ClosedByInterruptException
>       at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
>       at 
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
>       at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
>       at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
>       at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
>       at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1140)
>       ... 36 more
> {code}
> The relevant code that re-sets the interrupt flag is in 
> http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
> The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the 
> InterruptedException is caught and the interrupt flag is re-set.
> {code}
>   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
>       ConnectionId remoteId) throws InterruptedException, IOException {
>     Call call = new Call(rpcKind, rpcRequest);
>     Connection connection = getConnection(remoteId, call);
>     connection.sendParam(call);                 // send the parameter
>     boolean interrupted = false;
>     synchronized (call) {
>       while (!call.done) {
>         try {
>           call.wait();                           // wait for the result
>         } catch (InterruptedException ie) {
>           // save the fact that we were interrupted
>           interrupted = true;
>         }
>       }
>       if (interrupted) {
>         // set the interrupt flag now that we are done waiting
>         Thread.currentThread().interrupt();
>       }
>       if (call.error != null) {
>         if (call.error instanceof RemoteException) {
>           call.error.fillInStackTrace();
>           throw call.error;
>         } else { // local exception
>           InetSocketAddress address = connection.getRemoteAddress();
>           throw NetUtils.wrapException(address.getHostName(),
>                   address.getPort(),
>                   NetUtils.getHostname(),
>                   0,
>                   call.error);
>         }
>       } else {
>         return call.getRpcResult();
>       }
>     }
>   }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to