[ 
https://issues.apache.org/jira/browse/FLUME-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hari Shreedharan updated FLUME-1748:
------------------------------------

    Attachment: FLUME-1748.patch

Simple patch, not run unit tests. Will update later if necessary. Let me know 
if you don't agree with this.
                
> HDFS Sink can end up in a bad state if the Callables are interrupted.
> ---------------------------------------------------------------------
>
>                 Key: FLUME-1748
>                 URL: https://issues.apache.org/jira/browse/FLUME-1748
>             Project: Flume
>          Issue Type: Bug
>            Reporter: Hari Shreedharan
>         Attachments: FLUME-1748.patch
>
>
> If one or more HDFS Sink writes/flush/close end up taking too long, the 
> thread is interrupted by the future.cancel(true) call. The Hadoop RPC client 
> re-sets the interrupt flag on a thread which is interrupted. Also if the 
> thread is interrupted after the RPC call, but before the call() method 
> returns, the interrupt flag stays on the thread. A future HDFS file open call 
> would lead to an exception of this sort:
> {code}
> [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
> java.io.IOException: Failed on local exception: 
> java.nio.channels.ClosedByInterruptException; Host Details : local host is: 
> "random.example.com/10.20.81.108"; destination host is: 
> "random2.example.com":8020; 
>       at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1164)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>       at $Proxy9.create(Unknown Source)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>       at java.lang.reflect.Method.invoke(Method.java:597)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>       at $Proxy9.create(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
>       at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
>       at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
>       at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
>       at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
>       at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
>       at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
>       at 
> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
>       at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
>       at 
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
>       at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
>       at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
>       at 
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
>       at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
>       at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
>       at 
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
>       at 
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
>       at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>       at java.lang.Thread.run(Thread.java:662)
> Caused by: java.nio.channels.ClosedByInterruptException
>       at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
>       at 
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
>       at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
>       at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
>       at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
>       at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1140)
>       ... 36 more
> {code}
> The relevant code that re-sets the interrupt flag is in 
> http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
> The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the 
> InterruptedException is caught and the interrupt flag is re-set.
> {code}
>   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
>       ConnectionId remoteId) throws InterruptedException, IOException {
>     Call call = new Call(rpcKind, rpcRequest);
>     Connection connection = getConnection(remoteId, call);
>     connection.sendParam(call);                 // send the parameter
>     boolean interrupted = false;
>     synchronized (call) {
>       while (!call.done) {
>         try {
>           call.wait();                           // wait for the result
>         } catch (InterruptedException ie) {
>           // save the fact that we were interrupted
>           interrupted = true;
>         }
>       }
>       if (interrupted) {
>         // set the interrupt flag now that we are done waiting
>         Thread.currentThread().interrupt();
>       }
>       if (call.error != null) {
>         if (call.error instanceof RemoteException) {
>           call.error.fillInStackTrace();
>           throw call.error;
>         } else { // local exception
>           InetSocketAddress address = connection.getRemoteAddress();
>           throw NetUtils.wrapException(address.getHostName(),
>                   address.getPort(),
>                   NetUtils.getHostname(),
>                   0,
>                   call.error);
>         }
>       } else {
>         return call.getRpcResult();
>       }
>     }
>   }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to