Github user ortutay commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/298#discussion_r188777276
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java ---
@@ -77,57 +153,164 @@ public MemoryManager getMemoryManager() {
return memoryManager;
}
- private Cache<ImmutableBytesPtr,Closeable> getServerCaches() {
+ private Cache<ImmutableBytesPtr,CacheEntry> getServerCaches() {
/* Delay creation of this map until it's needed */
if (serverCaches == null) {
synchronized(this) {
if (serverCaches == null) {
- serverCaches = CacheBuilder.newBuilder()
- .expireAfterAccess(maxTimeToLiveMs,
TimeUnit.MILLISECONDS)
- .ticker(getTicker())
- .removalListener(new
RemovalListener<ImmutableBytesPtr, Closeable>(){
- @Override
- public void
onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) {
-
Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
- }
- })
- .build();
+ serverCaches = buildCache(maxTimeToLiveMs, false);
}
}
}
return serverCaches;
}
+
+ private Cache<ImmutableBytesPtr,CacheEntry>
getPersistentServerCaches() {
+ /* Delay creation of this map until it's needed */
+ if (persistentServerCaches == null) {
+ synchronized(this) {
+ if (persistentServerCaches == null) {
+ persistentServerCaches =
buildCache(maxPersistenceTimeToLiveMs, true);
+ }
+ }
+ }
+ return persistentServerCaches;
+ }
+
+ private Cache<ImmutableBytesPtr, CacheEntry> buildCache(final int ttl,
final boolean isPersistent) {
+ return CacheBuilder.newBuilder()
+ .expireAfterAccess(ttl, TimeUnit.MILLISECONDS)
+ .ticker(getTicker())
+ .removalListener(new RemovalListener<ImmutableBytesPtr,
CacheEntry>(){
+ @Override
+ public void
onRemoval(RemovalNotification<ImmutableBytesPtr, CacheEntry> notification) {
+ if (isPersistent ||
!notification.getValue().getUsePersistentCache()) {
+
Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
+ }
+ }
+ })
+ .build();
+ }
- @Override
+ private void evictInactiveEntries(long bytesNeeded) {
+ CacheEntry[] entries =
getPersistentServerCaches().asMap().values().toArray(new CacheEntry[]{});
+ Arrays.sort(entries);
+ long available = this.getMemoryManager().getAvailableMemory();
+ for (int i = 0; i < entries.length && available < bytesNeeded;
i++) {
+ CacheEntry entry = entries[i];
+ if (!entry.isLive()) {
+ getServerCaches().invalidate(entry.getCacheId());
+
getPersistentServerCaches().invalidate(entry.getCacheId());
+ available = this.getMemoryManager().getAvailableMemory();
+ }
+ }
+ }
+
+ private CacheEntry maybeGet(ImmutableBytesPtr cacheId) {
+ maybePromote(cacheId);
+ CacheEntry entry = getServerCaches().getIfPresent(cacheId);
+ return entry;
+ }
+
+ private void maybePromote(ImmutableBytesPtr cacheId) {
+ CacheEntry entry =
getPersistentServerCaches().getIfPresent(cacheId);
+ if (entry == null) {
+ return;
+ }
+ getServerCaches().put(cacheId, entry);
+ }
+
+ private void maybeDemote(ImmutableBytesPtr cacheId) {
+ CacheEntry entry = getServerCaches().getIfPresent(cacheId);
+ if (entry == null) {
+ return;
+ }
+ entry.decrementLiveQueryCount();
+ if (!entry.isLive()) {
+ getServerCaches().invalidate(cacheId);
+ }
+ }
+
+ public void debugPrintCaches() {
+ System.out.println("Live cache:" + getServerCaches());
+ for (ImmutableBytesPtr key :
getServerCaches().asMap().keySet()) {
+ System.out.println("- " +
Hex.encodeHexString(key.get()) +
+ " -> " +
getServerCaches().getIfPresent(key).size +
+ " lq:" +
getServerCaches().getIfPresent(key).liveQueriesCount +
+ " " +
Hex.encodeHexString(getServerCaches().getIfPresent(key).cachePtr.get()));
+ }
+ System.out.println("Persistent cache:" +
getPersistentServerCaches());
+ for (ImmutableBytesPtr key :
getPersistentServerCaches().asMap().keySet()) {
+ System.out.println("- " +
Hex.encodeHexString(key.get()) +
+ " -> " +
getPersistentServerCaches().getIfPresent(key).size +
+ " " +
Hex.encodeHexString(getPersistentServerCaches().getIfPresent(key).cachePtr.get()));
+ }
+ }
+
+ @Override
public Closeable getServerCache(ImmutableBytesPtr cacheId) {
getServerCaches().cleanUp();
- return getServerCaches().getIfPresent(cacheId);
+ CacheEntry entry = maybeGet(cacheId);
+ if (entry == null) {
+ return null;
+ }
+ return entry.closeable;
}
@Override
- public Closeable addServerCache(ImmutableBytesPtr cacheId,
ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory
cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws
SQLException {
+ public boolean checkServerCache(ImmutableBytesPtr cacheId, boolean
shouldIncrementLiveQueryCount) {
getServerCaches().cleanUp();
- MemoryChunk chunk =
this.getMemoryManager().allocate(cachePtr.getLength() + txState.length);
+ CacheEntry entry = maybeGet(cacheId);
+ if (entry != null && shouldIncrementLiveQueryCount) {
+ entry.incrementLiveQueryCount();
+ }
+ return entry != null;
+ }
+
+ @Override
+ public Closeable addServerCache(ImmutableBytesPtr cacheId,
ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory
cacheFactory, boolean useProtoForIndexMaintainer, boolean usePersistentCache,
int clientVersion) throws SQLException {
+ getServerCaches().cleanUp();
+ long available = this.getMemoryManager().getAvailableMemory();
+ int size = cachePtr.getLength() + txState.length;
+ if (size > available) {
+ evictInactiveEntries(Math.max(size - available +
EVICTION_MARGIN_BYTES, EVICTION_MARGIN_BYTES));
+ }
+ MemoryChunk chunk = this.getMemoryManager().allocate(size);
boolean success = false;
try {
- Closeable element = cacheFactory.newCache(cachePtr, txState,
chunk, useProtoForIndexMaintainer, clientVersion);
- getServerCaches().put(cacheId, element);
+ CacheEntry entry;
+ synchronized(this) {
+ entry = maybeGet(cacheId);
+ if (entry == null) {
+ entry = new CacheEntry(
+ cacheId, cachePtr, cacheFactory, txState,
chunk,
+ usePersistentCache, useProtoForIndexMaintainer,
+ clientVersion);
+ getServerCaches().put(cacheId, entry);
+ available =
this.getMemoryManager().getAvailableMemory();
+ if (usePersistentCache) {
+ getPersistentServerCaches().put(cacheId, entry);
+ }
+ }
+ entry.incrementLiveQueryCount();
+ }
success = true;
- return element;
+ return entry;
} finally {
if (!success) {
Closeables.closeAllQuietly(Collections.singletonList(chunk));
}
- }
+ }
}
-
+
@Override
- public void removeServerCache(ImmutableBytesPtr cacheId) {
- getServerCaches().invalidate(cacheId);
+ synchronized public void removeServerCache(ImmutableBytesPtr cacheId) {
--- End diff --
Yea good point, removed `maybeDemote`
---