[ https://issues.apache.org/jira/browse/FLUME-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hari Shreedharan updated FLUME-1748: ------------------------------------ Attachment: FLUME-1748.patch Simple patch, not run unit tests. Will update later if necessary. Let me know if you don't agree with this. > HDFS Sink can end up in a bad state if the Callables are interrupted. > --------------------------------------------------------------------- > > Key: FLUME-1748 > URL: https://issues.apache.org/jira/browse/FLUME-1748 > Project: Flume > Issue Type: Bug > Reporter: Hari Shreedharan > Attachments: FLUME-1748.patch > > > If one or more HDFS Sink writes/flush/close end up taking too long, the > thread is interrupted by the future.cancel(true) call. The Hadoop RPC client > re-sets the interrupt flag on a thread which is interrupted. Also if the > thread is interrupted after the RPC call, but before the call() method > returns, the interrupt flag stays on the thread. A future HDFS file open call > would lead to an exception of this sort: > {code} > [SinkRunner-PollingRunner-DefaultSinkProcessor] > (org.apache.flume.sink.hdfs.HDFSEventSink.process:457) - HDFS IO error > java.io.IOException: Failed on local exception: > java.nio.channels.ClosedByInterruptException; Host Details : local host is: > "random.example.com/10.20.81.108"; destination host is: > "random2.example.com":8020; > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759) > at org.apache.hadoop.ipc.Client.call(Client.java:1164) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202) > at $Proxy9.create(Unknown Source) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83) > at $Proxy9.create(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192) > at > org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298) > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674) > at > org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60) > at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210) > at > org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53) > at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172) > at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170) > at > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143) > at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170) > at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365) > at > org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730) > at > org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > Caused by: java.nio.channels.ClosedByInterruptException > at > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511) > at > org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193) > at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523) > at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488) > at > org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476) > at > org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570) > at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220) > at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213) > at org.apache.hadoop.ipc.Client.call(Client.java:1140) > ... 36 more > {code} > The relevant code that re-sets the interrupt flag is in > http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup > The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the > InterruptedException is caught and the interrupt flag is re-set. > {code} > public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, > ConnectionId remoteId) throws InterruptedException, IOException { > Call call = new Call(rpcKind, rpcRequest); > Connection connection = getConnection(remoteId, call); > connection.sendParam(call); // send the parameter > boolean interrupted = false; > synchronized (call) { > while (!call.done) { > try { > call.wait(); // wait for the result > } catch (InterruptedException ie) { > // save the fact that we were interrupted > interrupted = true; > } > } > if (interrupted) { > // set the interrupt flag now that we are done waiting > Thread.currentThread().interrupt(); > } > if (call.error != null) { > if (call.error instanceof RemoteException) { > call.error.fillInStackTrace(); > throw call.error; > } else { // local exception > InetSocketAddress address = connection.getRemoteAddress(); > throw NetUtils.wrapException(address.getHostName(), > address.getPort(), > NetUtils.getHostname(), > 0, > call.error); > } > } else { > return call.getRpcResult(); > } > } > } > {code} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira