[ 
https://issues.apache.org/jira/browse/HADOOP-14214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mingliang Liu updated HADOOP-14214:
-----------------------------------
    Status: Patch Available  (was: Open)

> DomainSocketWatcher::add()/delete() should not self interrupt while looping 
> await()
> -----------------------------------------------------------------------------------
>
>                 Key: HADOOP-14214
>                 URL: https://issues.apache.org/jira/browse/HADOOP-14214
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: hdfs-client
>            Reporter: Mingliang Liu
>            Assignee: Mingliang Liu
>            Priority: Critical
>         Attachments: HADOOP-14214.000.patch
>
>
> Our hive team found a TPCDS job whose queries running on LLAP seem to be 
> getting stuck. Dozens of threads were waiting for the 
> {{DfsClientShmManager::lock}}, as following jstack:
> {code}
> Thread 251 (IO-Elevator-Thread-5):
>   State: WAITING
>   Blocked count: 3871
>   Wtaited count: 4565
>   Waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@16ead198
>   Stack:
>     sun.misc.Unsafe.park(Native Method)
>     java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1976)
>     
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
>     
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
>     
> org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
>     
> org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
>     org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
>     org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
>     org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
>     
> org.apache.hadoop.fs.FSDataInputStream.readFully(FSDataInputStream.java:111)
>     
> org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readStripeFooter(RecordReaderUtils.java:166)
>     
> org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata.<init>(OrcStripeMetadata.java:64)
>     
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.readStripesMetadata(OrcEncodedDataReader.java:622)
> {code}
> The thread that is expected to signal those threads is calling 
> {{DomainSocketWatcher::add()}} method, but it gets stuck there dealing with 
> InterruptedException infinitely. The jstack is like:
> {code}
> Thread 44417 (TezTR-257387_2840_12_10_52_0):
>   State: RUNNABLE
>   Blocked count: 3
>   Wtaited count: 5
>   Stack:
>     java.lang.Throwable.fillInStackTrace(Native Method)
>     java.lang.Throwable.fillInStackTrace(Throwable.java:783)
>     java.lang.Throwable.<init>(Throwable.java:250)
>     java.lang.Exception.<init>(Exception.java:54)
>     java.lang.InterruptedException.<init>(InterruptedException.java:57)
>     
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2034)
>     
> org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
>     
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
>     
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
>     
> org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
>     
> org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
>     org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
>     org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
>     org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
> {code}
> The whole job makes no progress because of this.
> The thread in {{DomainSocketWatcher::add()}} is expected to eventually break 
> the while loop where it waits for the newly added entry being deleted by 
> another thread. However, if this thread is ever interrupted, chances are that 
> it will hold the lock forever so {{if(!toAdd.contains(entry))}} always be 
> false.
> {code:title=DomainSocketWatcher::add()}
>   public void add(DomainSocket sock, Handler handler) {
>     lock.lock();
>     try {
>       ......
>       toAdd.add(entry);
>       kick();
>       while (true) {
>         try {
>           processedCond.await();
>         } catch (InterruptedException e) {
>           Thread.currentThread().interrupt();
>         }
>         if (!toAdd.contains(entry)) {
>           break;
>         }
>       }
>     } finally {
>       lock.unlock();
>     }
>   }
> {code}
> The reason here is that, this method catches the InterruptedException and 
> self interrupts during await(). The await() method internally calls 
> {{AbstractQueuedSynchronizer::await()}}, which will throw a new 
> InterruptedException if it's interrupted.
> {code:title=AbstractQueuedSynchronizer::await()}
>         public final void await() throws InterruptedException {
>             if (Thread.interrupted())
>                 throw new InterruptedException();
>             Node node = addConditionWaiter();
>             ...
> {code}
> Our code in {{DomainSocketWatcher::add()}} catches this exception (again) and 
> self interrupt (again). Please note in this process, the associated lock is 
> never released so that the other thread which is supposed to make 
> {{if(!toAdd.contains(entry))}} be true is still pending on the lock.
> {{DomainSocketWatcher::delete()}} has similar code logic and should suffer 
> from similar problems. 
> Thanks [~jdere] for testing and reporting this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to