After noticing a lot of threads, I turned on debugging logging for hbase
client and saw this many times counting up constantly:
HBaseClient:531 - IPC Client (687163870) connection to
/10.1.37.21:60020from jeff: starting, having connections 1364

At that point in my code it was up to 1364 different connections (and
threads).  Those connections will eventually drop off after the idle time
is reached "conf.getInt("hbase.ipc.client.connection.maxidletime", 10000)".
But during periods of activity the number of threads can get very high.

Additionally I was able to confirm the large number of threads by doing:

jstack <pid> | grep IPC


So I started digging around in the code...

In HBaseClient.getConnection it attempts to reuse previous connections:

 ConnectionId remoteId = new ConnectionId(addr, protocol, ticket,
rpcTimeout);
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          LOG.error("poolsize: "+getPoolSize(conf));
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call));


It does this by using the connection id as the key to the pool. All of this
seems good except ConnectionId never hashes to the same value so it cannot
reuse any connection.

>From my understanding of the code here is why.

In HBaseClient.ConnectionId

    @Override
    public boolean equals(Object obj) {
     if (obj instanceof ConnectionId) {
       ConnectionId id = (ConnectionId) obj;
       return address.equals(id.address) && protocol == id.protocol &&
              ((ticket != null && ticket.equals(id.ticket)) ||
               (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout;
     }
     return false;
    }

    @Override  // simply use the default Object#hashcode() ?
    public int hashCode() {
      return (address.hashCode() + PRIME * (
                  PRIME * System.identityHashCode(protocol) ^
             (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
    }

It uses the protocol and the ticket in the both functions.  However going
back through all of the layers I think I found the problem.

Problem:

HBaseRPC.java:  public static VersionedProtocol getProxy(Class<? extends
VersionedProtocol> protocol,
      long clientVersion, InetSocketAddress addr, Configuration conf,
      SocketFactory factory, int rpcTimeout) throws IOException {
    return getProxy(protocol, clientVersion, addr,
        User.getCurrent(), conf, factory, rpcTimeout);
  }

User.getCurrent() always returns a new User object.  That user instance is
eventually passed down to ConnectionId.  However the User object doesn't
implement hash() or equals() so one ConnectionId won't ever match another
ConnectionId.


There are several possible solutions.
1. implement hashCode and equals for the User.
2. only create one User object and reuse it.
3. don't look at ticket in ConnectionId (probably a bad idea)


Thoughts?  Has anyone else noticed this behavior?  Should I open up a jira
issue?

I originally ran into the problem due to OS X having a limited number of
threads per user (and I was not able to increase the limit) and our unit
tests making requests quick enough that I ran out of threads.  I tried out
all three solutions and it worked fine for my application.  However I'm not
sure what changing the behavior would do to other's applications especially
those that use SecureHadoop.



Thanks,
~Jeff

-- 
Jeff Whiting
Qualtrics Senior Software Engineer
je...@qualtrics.com

Reply via email to