[ https://issues.apache.org/jira/browse/ACCUMULO-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13860853#comment-13860853 ]
William Slacum edited comment on ACCUMULO-2027 at 1/2/14 10:07 PM: ------------------------------------------------------------------- I should've been more specific- I'm specifically taking issue with the way the thrift pool is closed. With a ZooCache, it's related specifically to a set of zookeeper hosts/quorum and a timeout, but the thrift pools are global. The way {{ZooKeeperInstance.close()}} is implemented, I should be able to have a client application with two {{ZooKeeperInstance}} objectss that point to distinct {{ZooCache}} objects-- closing one shouldn't effect another. With a single {{ThriftUtil.close()}}, one {{ZooKeeperInstance}} object is closing resources for all other {{ZooKeeperInstance}} objects. was (Author: bills): I should've been more specific- I'm specifically taking issue with the way the thrift pool is closed. With a ZooCache, it's related specifically to a set of zookeeper hosts/quorum and a timeout, but the thrift pools are global. The way {{ZooKeeperInstance.close()}} is implemented, I should be able to have a client application with two {{ZooKeeperInstance}}s that point to distinct {{ZooCache}}s-- closing one shouldn't effect another. With a single {{ThriftUtil.close()}}, one {{ZooKeeperInstance}} is closing resources for all other {{ZooKeeperInstance}}s. > ZooKeeperInstance.close() not freeing resources in multithreaded env > -------------------------------------------------------------------- > > Key: ACCUMULO-2027 > URL: https://issues.apache.org/jira/browse/ACCUMULO-2027 > Project: Accumulo > Issue Type: Bug > Reporter: Keith Turner > Assignee: William Slacum > Priority: Critical > Fix For: 1.4.5, 1.5.1, 1.6.0 > > > While looking at the changes related to ZooKeeperInstance.close() in the > 1.4.5-SNAPSHOT branch I noticed there were race conditions where resources > were not properly released. One type of race condition is where a thread is > between a closed check in ZooKeeperInstance and calling a ZooCache method > when ZooKeeperInstance.close() is called. The following is an example > situation > # Thread 1 uses ZooKeeperInstance1 to get a zoocache. > # Thread 2 calls close() on ZooKeeperInstnce1 which calls close() on zoocache > # Thread 1 uses the zoocache it has reference to, causing a new zookeeper > connection to be created. > Below is an example program that will trigger this behavior. For me this > little example program reliably shows a connected zookeeper after all of the > threads die. If I use 0 threads it will show a closed zookeeper connection > at the end. > {code:java} > static class WriteTask implements Runnable { > private BatchWriter writer; > private Random rand; > WriteTask(Connector conn) throws TableNotFoundException { > rand = new Random(); > writer = conn.createBatchWriter("foo5", 10000000, 30000, 1); > } > @Override > public void run() { > try { > while (true) { > Mutation m1 = new Mutation(String.format("%06d", > rand.nextInt(1000000))); > m1.put(String.format("%06d", rand.nextInt(100)), > String.format("%06d", rand.nextInt(100)), String.format("%06d", > rand.nextInt(1000000))); > writer.addMutation(m1); > writer.flush(); > } > } catch (Exception e) { > System.out.println(e.getMessage()); > } > } > } > static class ReadTask implements Runnable { > private Scanner scanner; > ReadTask(Connector conn) throws TableNotFoundException { > scanner = conn.createScanner("foo5", new Authorizations()); > } > @Override > public void run() { > try { > while (true) { > for (Entry<Key,Value> entry : scanner) { > } > } > } catch (Exception e) { > System.out.println(e.getMessage()); > } > } > } > @Test(timeout = 30000) > public void test2() throws Exception { > ZooKeeperInstance zki = new ZooKeeperInstance(accumulo.getInstanceName(), > accumulo.getZooKeepers()); > Connector conn = zki.getConnector("root", "superSecret"); > conn.tableOperations().create("foo5"); > ArrayList<Thread> threads = new ArrayList<Thread>(); > int numThreads = 10; > for (int i = 0; i < numThreads; i++) { > Thread t = new Thread(new WriteTask(conn)); > t.start(); > threads.add(t); > } > for (int i = 0; i < numThreads; i++) { > Thread t = new Thread(new ReadTask(conn)); > t.start(); > threads.add(t); > } > // let threads get spun up > Thread.sleep(1000); > ZooSession.printSessions(); > zki.close(); > // wait for the threads to die > for (Thread thread : threads) { > thread.join(); > } > ZooSession.printSessions(); > } > {code} > Below are some changes I made to ZooSession for debugging purposes. > {noformat} > diff --git > a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java > b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java > index b3db26f..475a21d 100644 > --- > a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java > +++ > b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java > @@ -20,6 +20,8 @@ > import java.net.UnknownHostException; > import java.util.HashMap; > import java.util.Map; > +import java.util.Map.Entry; > +import java.util.Set; > > import org.apache.accumulo.core.util.UtilWaitThread; > import org.apache.log4j.Logger; > @@ -29,7 +31,7 @@ > import org.apache.zookeeper.ZooKeeper; > import org.apache.zookeeper.ZooKeeper.States; > > -class ZooSession { > +public class ZooSession { > > private static final Logger log = Logger.getLogger(ZooSession.class); > > @@ -121,6 +123,8 @@ > > ZooSessionInfo zsi = sessions.get(sessionKey); > if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) { > + System.out.println("Removing closed session "); > + new Exception().printStackTrace(); > if (auth != null && sessions.get(readOnlySessionKey) == zsi) > sessions.remove(readOnlySessionKey); > zsi = null; > @@ -137,4 +141,13 @@ > } > return zsi.zooKeeper; > } > + > + public static synchronized void printSessions() { > + Set<Entry<String,ZooSessionInfo>> es = sessions.entrySet(); > + > + for (Entry<String,ZooSessionInfo> entry : es) { > + System.out.println(entry.getKey() + " " + > entry.getValue().zooKeeper.getState()); > + } > + } > + > } > {noformat} > With the above changes I will see an exception like the following when one of > the race conditions occurs. > {noformat} > Removing closed session > java.lang.Exception > at > org.apache.accumulo.core.zookeeper.ZooSession.getSession(ZooSession.java:127) > at > org.apache.accumulo.core.zookeeper.ZooReader.getSession(ZooReader.java:37) > at > org.apache.accumulo.core.zookeeper.ZooReader.getZooKeeper(ZooReader.java:41) > at > org.apache.accumulo.core.zookeeper.ZooCache.getZooKeeper(ZooCache.java:56) > at org.apache.accumulo.core.zookeeper.ZooCache.retry(ZooCache.java:127) > at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:233) > at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:188) > at > org.apache.accumulo.core.client.ZooKeeperInstance.getInstanceID(ZooKeeperInstance.java:156) > at > org.apache.accumulo.core.client.impl.TabletLocator.getInstance(TabletLocator.java:96) > at > org.apache.accumulo.core.client.impl.ThriftScanner.scan(ThriftScanner.java:245) > at > org.apache.accumulo.core.client.impl.ScannerIterator$Reader.run(ScannerIterator.java:94) > at > org.apache.accumulo.core.client.impl.ScannerIterator.hasNext(ScannerIterator.java:176) > at > org.apache.accumulo.minicluster.MiniAccumuloClusterTest$ReadTask.run(MiniAccumuloClusterTest.java:109) > at java.lang.Thread.run(Thread.java:662) > {noformat} -- This message was sent by Atlassian JIRA (v6.1.5#6160)