keith-turner closed pull request #376: ACCUMULO-4798 optimized stat in ZooCache URL: https://github.com/apache/accumulo/pull/376
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java index 801ee2cef3..6fa4967b01 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java @@ -18,11 +18,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -60,14 +55,39 @@ private final Lock cacheReadLock = cacheLock.readLock(); private final HashMap<String,byte[]> cache; - private final HashMap<String,Stat> statCache; + private final HashMap<String,ZcStat> statCache; private final HashMap<String,List<String>> childrenCache; private final ZooReader zReader; + public static class ZcStat { + private long ephemeralOwner; + + public ZcStat() { + + } + + private ZcStat(Stat stat) { + this.ephemeralOwner = stat.getEphemeralOwner(); + } + + public long getEphemeralOwner() { + return ephemeralOwner; + } + + private void set(ZcStat cachedStat) { + this.ephemeralOwner = cachedStat.ephemeralOwner; + } + + @VisibleForTesting + public void setEphemeralOwner(long ephemeralOwner) { + this.ephemeralOwner = ephemeralOwner; + } + } + private static class ImmutableCacheCopies { final Map<String,byte[]> cache; - final Map<String,Stat> statCache; + final Map<String,ZcStat> statCache; final Map<String,List<String>> childrenCache; ImmutableCacheCopies() { @@ -76,7 +96,7 @@ childrenCache = Collections.emptyMap(); } - ImmutableCacheCopies(Map<String,byte[]> cache, Map<String,Stat> statCache, Map<String,List<String>> childrenCache) { + ImmutableCacheCopies(Map<String,byte[]> cache, Map<String,ZcStat> statCache, Map<String,List<String>> childrenCache) { this.cache = Collections.unmodifiableMap(new HashMap<>(cache)); this.statCache = Collections.unmodifiableMap(new HashMap<>(statCache)); this.childrenCache = Collections.unmodifiableMap(new HashMap<>(childrenCache)); @@ -88,7 +108,7 @@ this.childrenCache = Collections.unmodifiableMap(new HashMap<>(childrenCache)); } - ImmutableCacheCopies(Map<String,byte[]> cache, Map<String,Stat> statCache, ImmutableCacheCopies prev) { + ImmutableCacheCopies(Map<String,byte[]> cache, Map<String,ZcStat> statCache, ImmutableCacheCopies prev) { this.cache = Collections.unmodifiableMap(new HashMap<>(cache)); this.statCache = Collections.unmodifiableMap(new HashMap<>(statCache)); this.childrenCache = prev.childrenCache; @@ -322,20 +342,20 @@ public T retry() { * status object to populate * @return path data, or null if non-existent */ - public byte[] get(final String zPath, final Stat status) { + public byte[] get(final String zPath, final ZcStat status) { ZooRunnable<byte[]> zr = new ZooRunnable<byte[]>() { @Override public byte[] run() throws KeeperException, InterruptedException { - Stat stat = null; + ZcStat zstat = null; // only read volatile once so following code works with a consistent snapshot ImmutableCacheCopies lic = immutableCache; byte[] val = lic.cache.get(zPath); if (val != null || lic.cache.containsKey(zPath)) { if (status != null) { - stat = lic.statCache.get(zPath); - copyStats(status, stat); + zstat = lic.statCache.get(zPath); + copyStats(status, zstat); } return val; } @@ -348,7 +368,7 @@ public T retry() { cacheWriteLock.lock(); try { final ZooKeeper zooKeeper = getZooKeeper(); - stat = zooKeeper.exists(zPath, watcher); + Stat stat = zooKeeper.exists(zPath, watcher); byte[] data = null; if (stat == null) { if (log.isTraceEnabled()) { @@ -357,6 +377,7 @@ public T retry() { } else { try { data = zooKeeper.getData(zPath, watcher, stat); + zstat = new ZcStat(stat); } catch (KeeperException.BadVersionException e1) { throw new ConcurrentModificationException(); } catch (KeeperException.NoNodeException e2) { @@ -366,8 +387,8 @@ public T retry() { log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new String(data, UTF_8))); } } - put(zPath, data, stat); - copyStats(status, stat); + put(zPath, data, zstat); + copyStats(status, zstat); return data; } finally { cacheWriteLock.unlock(); @@ -386,26 +407,13 @@ public T retry() { * @param cachedStat * cached statistic, that is or will be cached */ - protected void copyStats(Stat userStat, Stat cachedStat) { + protected void copyStats(ZcStat userStat, ZcStat cachedStat) { if (userStat != null && cachedStat != null) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - cachedStat.write(dos); - dos.close(); - - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - DataInputStream dis = new DataInputStream(bais); - userStat.readFields(dis); - - dis.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } + userStat.set(cachedStat); } } - private void put(String zPath, byte[] data, Stat stat) { + private void put(String zPath, byte[] data, ZcStat stat) { cacheWriteLock.lock(); try { cache.put(zPath, data); diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java index 90fb4aa4a4..9cf5fd4e4b 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; +import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.zookeeper.KeeperException; @@ -411,7 +412,7 @@ public static boolean isLockHeld(ZooCache zc, LockID lid) { if (!lid.node.equals(lockNode)) return false; - Stat stat = new Stat(); + ZcStat stat = new ZcStat(); return zc.get(lid.path + "/" + lid.node, stat) != null && stat.getEphemeralOwner() == lid.eid; } @@ -429,7 +430,7 @@ public static boolean isLockHeld(ZooCache zc, LockID lid) { return zk.getData(path + "/" + lockNode, false, null); } - public static byte[] getLockData(org.apache.accumulo.fate.zookeeper.ZooCache zc, String path, Stat stat) { + public static byte[] getLockData(org.apache.accumulo.fate.zookeeper.ZooCache zc, String path, ZcStat stat) { List<String> children = zc.getChildren(path); @@ -461,7 +462,7 @@ public static long getSessionId(ZooCache zc, String path) throws KeeperException String lockNode = children.get(0); - Stat stat = new Stat(); + ZcStat stat = new ZcStat(); if (zc.get(path + "/" + lockNode, stat) != null) return stat.getEphemeralOwner(); return 0; diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java index 6c35ed1ebb..6d323b7f5e 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java @@ -33,7 +33,9 @@ import static org.junit.Assert.assertTrue; import java.util.List; +import java.util.Random; +import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -75,13 +77,13 @@ public void testGet_FillStat() throws Exception { } private void testGet(boolean fillStat) throws Exception { - Stat myStat = null; + ZcStat myStat = null; if (fillStat) { - myStat = new Stat(); + myStat = new ZcStat(); } - long now = System.currentTimeMillis(); + final long ephemeralOwner = new Random().nextLong(); Stat existsStat = new Stat(); - existsStat.setMtime(now); + existsStat.setEphemeralOwner(ephemeralOwner); expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat); expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(DATA); replay(zk); @@ -90,7 +92,7 @@ private void testGet(boolean fillStat) throws Exception { assertArrayEquals(DATA, (fillStat ? zc.get(ZPATH, myStat) : zc.get(ZPATH))); verify(zk); if (fillStat) { - assertEquals(now, myStat.getMtime()); + assertEquals(ephemeralOwner, myStat.getEphemeralOwner()); } assertTrue(zc.dataCached(ZPATH)); diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java index 782981c2cc..f179acd4fb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.util.Halt; import org.apache.accumulo.server.util.time.SimpleTimer; @@ -54,7 +55,6 @@ import org.apache.zookeeper.KeeperException.NotEmptyException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -282,7 +282,7 @@ private synchronized void checkServer(final Set<TServerInstance> updates, final TServerInfo info = current.get(zPath); final String lockPath = path + "/" + zPath; - Stat stat = new Stat(); + ZcStat stat = new ZcStat(); byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat); if (lockData == null) { diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java index cb7dc28dcc..8d0e7f39d6 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java @@ -24,7 +24,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.fate.zookeeper.ZooCache; -import org.apache.zookeeper.data.Stat; +import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat; import org.easymock.EasyMock; import org.easymock.IAnswer; import org.junit.Test; @@ -55,11 +55,11 @@ public void testQualifySessionId() { String serverPath = root + "/" + server; EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.singletonList("child")); - EasyMock.expect(zc.get(EasyMock.eq(serverPath + "/child"), EasyMock.anyObject(Stat.class))).andAnswer(new IAnswer<byte[]>() { + EasyMock.expect(zc.get(EasyMock.eq(serverPath + "/child"), EasyMock.anyObject(ZcStat.class))).andAnswer(new IAnswer<byte[]>() { @Override public byte[] answer() throws Throwable { - Stat stat = (Stat) EasyMock.getCurrentArguments()[1]; + ZcStat stat = (ZcStat) EasyMock.getCurrentArguments()[1]; stat.setEphemeralOwner(session); return new byte[0]; } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services