Hari Shreedharan created FLUME-1748:
---------------------------------------

             Summary: HDFS Sink can end up in a bad state if the Callables are 
interrupted.
                 Key: FLUME-1748
                 URL: https://issues.apache.org/jira/browse/FLUME-1748
             Project: Flume
          Issue Type: Bug
            Reporter: Hari Shreedharan


If one or more HDFS Sink writes/flush/close end up taking too long, the thread 
is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets 
the interrupt flag on a thread which is interrupted. Also if the thread is 
interrupted after the RPC call, but before the call() method returns, the 
interrupt flag stays on the thread. A future HDFS file open call would lead to 
an exception of this sort:

{code}
[SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
java.io.IOException: Failed on local exception: 
java.nio.channels.ClosedByInterruptException; Host Details : local host is: 
"random.example.com/10.20.81.108"; destination host is: 
"random2.example.com":8020; 
        at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
        at org.apache.hadoop.ipc.Client.call(Client.java:1164)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
        at $Proxy9.create(Unknown Source)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
        at $Proxy9.create(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
        at 
org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
        at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
        at 
org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
        at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
        at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
        at 
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
        at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
        at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
        at 
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
        at 
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.nio.channels.ClosedByInterruptException
        at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
        at 
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
        at 
org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
        at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
        at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
        at org.apache.hadoop.ipc.Client.call(Client.java:1140)
        ... 36 more
{code}

The relevant code that re-sets the interrupt flag is in 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup

The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the 
InterruptedException is caught and the interrupt flag is re-set.

{code}
  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      ConnectionId remoteId) throws InterruptedException, IOException {
    Call call = new Call(rpcKind, rpcRequest);
    Connection connection = getConnection(remoteId, call);
    connection.sendParam(call);                 // send the parameter
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception
          InetSocketAddress address = connection.getRemoteAddress();
          throw NetUtils.wrapException(address.getHostName(),
                  address.getPort(),
                  NetUtils.getHostname(),
                  0,
                  call.error);
        }
      } else {
        return call.getRpcResult();
      }
    }
  }
{code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to