[
https://issues.apache.org/jira/browse/FLUME-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13507037#comment-13507037
]
Hari Shreedharan commented on FLUME-1748:
-----------------------------------------
Seems like a previous Hadoop RPC using the same bucket writer taking too long
can cause this issue, because several threads end up being blocked on the same
method. This eventually will cause the HDFS sink threads to be interrupted by
the future.cancel call -- but they still end up going into the RPC calls only
to be hit by a bunch of exceptions.
> HDFS Sink can end up in a bad state if the Callables are interrupted.
> ---------------------------------------------------------------------
>
> Key: FLUME-1748
> URL: https://issues.apache.org/jira/browse/FLUME-1748
> Project: Flume
> Issue Type: Bug
> Reporter: Hari Shreedharan
>
> If one or more HDFS Sink writes/flush/close end up taking too long, the
> thread is interrupted by the future.cancel(true) call. The Hadoop RPC client
> re-sets the interrupt flag on a thread which is interrupted. Also if the
> thread is interrupted after the RPC call, but before the call() method
> returns, the interrupt flag stays on the thread. A future HDFS file open call
> would lead to an exception of this sort:
> {code}
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:457) - HDFS IO error
> java.io.IOException: Failed on local exception:
> java.nio.channels.ClosedByInterruptException; Host Details : local host is:
> "random.example.com/10.20.81.108"; destination host is:
> "random2.example.com":8020;
> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
> at org.apache.hadoop.ipc.Client.call(Client.java:1164)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> at $Proxy9.create(Unknown Source)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> at $Proxy9.create(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
> at
> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
> at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
> at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
> at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
> at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
> at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: java.nio.channels.ClosedByInterruptException
> at
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
> at
> org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
> at
> org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
> at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
> at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
> at org.apache.hadoop.ipc.Client.call(Client.java:1140)
> ... 36 more
> {code}
> The relevant code that re-sets the interrupt flag is in
> http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
> The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the
> InterruptedException is caught and the interrupt flag is re-set.
> {code}
> public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
> ConnectionId remoteId) throws InterruptedException, IOException {
> Call call = new Call(rpcKind, rpcRequest);
> Connection connection = getConnection(remoteId, call);
> connection.sendParam(call); // send the parameter
> boolean interrupted = false;
> synchronized (call) {
> while (!call.done) {
> try {
> call.wait(); // wait for the result
> } catch (InterruptedException ie) {
> // save the fact that we were interrupted
> interrupted = true;
> }
> }
> if (interrupted) {
> // set the interrupt flag now that we are done waiting
> Thread.currentThread().interrupt();
> }
> if (call.error != null) {
> if (call.error instanceof RemoteException) {
> call.error.fillInStackTrace();
> throw call.error;
> } else { // local exception
> InetSocketAddress address = connection.getRemoteAddress();
> throw NetUtils.wrapException(address.getHostName(),
> address.getPort(),
> NetUtils.getHostname(),
> 0,
> call.error);
> }
> } else {
> return call.getRpcResult();
> }
> }
> }
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira