PHOENIX-4666 Persistent subquery cache for hash joins Signed-off-by: Josh Elser <els...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cb697933 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cb697933 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cb697933 Branch: refs/heads/4.x-cdh5.15 Commit: cb6979333155b3d6b9fd0684304f52e9b33f42f4 Parents: 912215c Author: Marcell Ortutay <marcell.ortu...@gmail.com> Authored: Thu Mar 29 20:59:03 2018 +0100 Committer: Pedro Boado <pbo...@apache.org> Committed: Wed Oct 17 22:49:38 2018 +0100 ---------------------------------------------------------------------- .../end2end/join/HashJoinPersistentCacheIT.java | 167 +++++++++++++++ .../org/apache/phoenix/cache/GlobalCache.java | 22 +- .../apache/phoenix/cache/ServerCacheClient.java | 59 ++++-- .../org/apache/phoenix/cache/TenantCache.java | 2 +- .../apache/phoenix/cache/TenantCacheImpl.java | 209 ++++++++++++++++--- .../apache/phoenix/compile/QueryCompiler.java | 9 +- .../phoenix/compile/StatementContext.java | 21 +- .../coprocessor/HashJoinRegionScanner.java | 4 +- .../coprocessor/ServerCachingEndpointImpl.java | 2 +- .../generated/ServerCachingProtos.java | 117 +++++++++-- .../apache/phoenix/execute/HashJoinPlan.java | 104 +++++++-- .../phoenix/iterate/BaseResultIterators.java | 8 +- .../phoenix/iterate/TableResultIterator.java | 6 +- .../apache/phoenix/join/HashCacheClient.java | 24 ++- .../apache/phoenix/join/HashCacheFactory.java | 11 + .../java/org/apache/phoenix/parse/HintNode.java | 4 + .../org/apache/phoenix/query/QueryServices.java | 1 + .../phoenix/query/QueryServicesOptions.java | 1 + .../apache/phoenix/cache/TenantCacheTest.java | 112 ++++++++-- .../src/main/ServerCachingService.proto | 1 + phoenix-protocol/src/main/build-proto.sh | 6 + 21 files changed, 773 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java new file mode 100644 index 0000000..2f072b8 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinPersistentCacheIT.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.join; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.Properties; + +import org.apache.phoenix.end2end.join.HashJoinCacheIT.InvalidateHashCache; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Test; + +public class HashJoinPersistentCacheIT extends BaseJoinIT { + + @Override + protected String getTableName(Connection conn, String virtualName) throws Exception { + String realName = super.getTableName(conn, virtualName); + TestUtil.addCoprocessor(conn, SchemaUtil.normalizeFullTableName(realName), + InvalidateHashCache.class); + return realName; + } + + @Test + public void testPersistentCache() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + + createTestTable(getUrl(), + "CREATE TABLE IF NOT EXISTS states (state CHAR(2) " + + "NOT NULL, name VARCHAR NOT NULL CONSTRAINT my_pk PRIMARY KEY (state, name))"); + createTestTable(getUrl(), + "CREATE TABLE IF NOT EXISTS cities (state CHAR(2) " + + "NOT NULL, city VARCHAR NOT NULL, population BIGINT " + + "CONSTRAINT my_pk PRIMARY KEY (state, city))"); + + conn.prepareStatement( + "UPSERT INTO states VALUES ('CA', 'California')").executeUpdate(); + conn.prepareStatement( + "UPSERT INTO states VALUES ('AZ', 'Arizona')").executeUpdate(); + conn.prepareStatement( + "UPSERT INTO cities VALUES ('CA', 'San Francisco', 50000)").executeUpdate(); + conn.prepareStatement( + "UPSERT INTO cities VALUES ('CA', 'Sacramento', 3000)").executeUpdate(); + conn.prepareStatement( + "UPSERT INTO cities VALUES ('AZ', 'Phoenix', 20000)").executeUpdate(); + conn.commit(); + + /* First, run query without using the persistent cache. This should return + * different results after an UPSERT takes place. + */ + ResultSet rs = conn.prepareStatement( + "SELECT SUM(population) FROM states s " + +"JOIN cities c ON c.state = s.state").executeQuery(); + rs.next(); + int population1 = rs.getInt(1); + + conn.prepareStatement("UPSERT INTO cities VALUES ('CA', 'Mt View', 1500)").executeUpdate(); + conn.commit(); + rs = conn.prepareStatement( + "SELECT SUM(population) FROM states s " + + "JOIN cities c ON c.state = s.state").executeQuery(); + rs.next(); + int population2 = rs.getInt(1); + + assertEquals(73000, population1); + assertEquals(74500, population2); + + /* Second, run query using the persistent cache. This should return the + * same results after an UPSERT takes place. + */ + rs = conn.prepareStatement( + "SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) FROM states s " + + "JOIN cities c ON c.state = s.state").executeQuery(); + rs.next(); + int population3 = rs.getInt(1); + + conn.prepareStatement( + "UPSERT INTO cities VALUES ('CA', 'Palo Alto', 2000)").executeUpdate(); + conn.commit(); + + rs = conn.prepareStatement( + "SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) " + + "FROM states s JOIN cities c ON c.state = s.state").executeQuery(); + rs.next(); + int population4 = rs.getInt(1); + rs = conn.prepareStatement( + "SELECT SUM(population) FROM states s JOIN cities c ON c.state = s.state") + .executeQuery(); + rs.next(); + int population5 = rs.getInt(1); + + assertEquals(74500, population3); + assertEquals(74500, population4); + assertEquals(76500, population5); + + /* Let's make sure caches can be used across queries. We'll set up a + * cache, and make sure it is used on two different queries with the + * same subquery. + */ + + String sumQueryCached = "SELECT /*+ USE_PERSISTENT_CACHE */ SUM(population) " + + "FROM cities c JOIN (SELECT state FROM states WHERE state LIKE 'C%') sq " + + "ON sq.state = c.state"; + String distinctQueryCached = "SELECT /*+ USE_PERSISTENT_CACHE */ " + + "COUNT(DISTINCT(c.city)) FROM cities c " + + "JOIN (SELECT state FROM states WHERE state LIKE 'C%') sq " + + "ON sq.state = c.state"; + String sumQueryUncached = sumQueryCached.replace( + "/*+ USE_PERSISTENT_CACHE */", ""); + String distinctQueryUncached = distinctQueryCached.replace( + "/*+ USE_PERSISTENT_CACHE */", ""); + + rs = conn.prepareStatement(sumQueryCached).executeQuery(); + rs.next(); + int population6 = rs.getInt(1); + rs = conn.prepareStatement(distinctQueryCached).executeQuery(); + rs.next(); + int distinct1 = rs.getInt(1); + assertEquals(4, distinct1); + + // Add a new city that matches the queries. This should not affect results + // using persistent caching. + conn.prepareStatement("UPSERT INTO states VALUES ('CO', 'Colorado')").executeUpdate(); + conn.prepareStatement("UPSERT INTO cities VALUES ('CO', 'Denver', 6000)").executeUpdate(); + conn.commit(); + + rs = conn.prepareStatement(sumQueryCached).executeQuery(); + rs.next(); + int population7 = rs.getInt(1); + assertEquals(population6, population7); + rs = conn.prepareStatement(distinctQueryCached).executeQuery(); + rs.next(); + int distinct2 = rs.getInt(1); + assertEquals(distinct1, distinct2); + + // Finally, make sure uncached queries give up to date results + rs = conn.prepareStatement(sumQueryUncached).executeQuery(); + rs.next(); + int population8 = rs.getInt(1); + assertEquals(population8, 62500); + rs = conn.prepareStatement(distinctQueryUncached).executeQuery(); + rs.next(); + int distinct3 = rs.getInt(1); + assertEquals(5, distinct3); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java index ae77174..5f3e29b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java @@ -148,7 +148,12 @@ public class GlobalCache extends TenantCacheImpl { private GlobalCache(Configuration config) { super(new GlobalMemoryManager(getMaxMemorySize(config)), - config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS)); + config.getInt( + QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS), + config.getInt( + QueryServices.MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS)); this.config = config; } @@ -164,9 +169,18 @@ public class GlobalCache extends TenantCacheImpl { public TenantCache getChildTenantCache(ImmutableBytesPtr tenantId) { TenantCache tenantCache = perTenantCacheMap.get(tenantId); if (tenantCache == null) { - int maxTenantMemoryPerc = config.getInt(MAX_TENANT_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC); - int maxServerCacheTimeToLive = config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); - TenantCacheImpl newTenantCache = new TenantCacheImpl(new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc), maxServerCacheTimeToLive); + int maxTenantMemoryPerc = config.getInt( + MAX_TENANT_MEMORY_PERC_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC); + int maxServerCacheTimeToLive = config.getInt( + QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); + int maxServerCachePersistenceTimeToLive = config.getInt( + QueryServices.MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS); + TenantCacheImpl newTenantCache = new TenantCacheImpl( + new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc), + maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive); tenantCache = perTenantCacheMap.putIfAbsent(tenantId, newTenantCache); if (tenantCache == null) { tenantCache = newTenantCache; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 5e284bd..011a6f8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.HTableInterface; @@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; @@ -59,6 +61,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCac import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService; +import org.apache.phoenix.expression.Expression; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.join.HashCacheFactory; @@ -75,6 +78,8 @@ import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; +import com.google.protobuf.ByteString; + /** * * Client for sending cache to each region server @@ -215,22 +220,46 @@ public class ServerCacheClient { } } } - } - - public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, - final ServerCacheFactory cacheFactory, final PTable cacheUsingTable) throws SQLException { - return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable, false); + + public ServerCache createServerCache(byte[] cacheId, QueryPlan delegate) + throws SQLException, IOException { + PTable cacheUsingTable = delegate.getTableRef().getTable(); + ConnectionQueryServices services = delegate.getContext().getConnection().getQueryServices(); + List<HRegionLocation> locations = services.getAllTableRegions( + cacheUsingTable.getPhysicalName().getBytes()); + int nRegions = locations.size(); + Set<HRegionLocation> servers = new HashSet<>(nRegions); + cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable); + return new ServerCache(cacheId, servers, new ImmutableBytesWritable( + new byte[]{}), services, false); } - - public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, - final ServerCacheFactory cacheFactory, final PTable cacheUsingTable, boolean storeCacheOnClient) + + public ServerCache addServerCache( + ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, + final ServerCacheFactory cacheFactory, final PTable cacheUsingTable) throws SQLException { + return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable, false); + } + + public ServerCache addServerCache( + ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, + final ServerCacheFactory cacheFactory, final PTable cacheUsingTable, + boolean storeCacheOnClient) throws SQLException { + final byte[] cacheId = ServerCacheClient.generateId(); + return addServerCache(keyRanges, cacheId, cachePtr, txState, cacheFactory, + cacheUsingTable, false, storeCacheOnClient); + } + + public ServerCache addServerCache( + ScanRanges keyRanges, final byte[] cacheId, final ImmutableBytesWritable cachePtr, + final byte[] txState, final ServerCacheFactory cacheFactory, + final PTable cacheUsingTable, final boolean usePersistentCache, + boolean storeCacheOnClient) throws SQLException { ConnectionQueryServices services = connection.getQueryServices(); List<Closeable> closeables = new ArrayList<Closeable>(); ServerCache hashCacheSpec = null; SQLException firstException = null; - final byte[] cacheId = generateId(); /** * Execute EndPoint in parallel on each server to send compressed hash cache */ @@ -251,7 +280,7 @@ public class ServerCacheClient { byte[] regionEndKey = entry.getRegionInfo().getEndKey(); if ( ! servers.contains(entry) && keyRanges.intersectRegion(regionStartKey, regionEndKey, - cacheUsingTable.getIndexType() == IndexType.LOCAL)) { + cacheUsingTable.getIndexType() == IndexType.LOCAL)) { // Call RPC once per server servers.add(entry); if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry, connection));} @@ -262,7 +291,7 @@ public class ServerCacheClient { @Override public Boolean call() throws Exception { - return addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState); + return addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState, usePersistentCache); } /** @@ -291,7 +320,7 @@ public class ServerCacheClient { for (Future<Boolean> future : futures) { future.get(timeoutMs, TimeUnit.MILLISECONDS); } - + cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable); success = true; } catch (SQLException e) { @@ -444,7 +473,7 @@ public class ServerCacheClient { } if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) { success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory, - txState); + txState, false); } return success; } finally { @@ -453,7 +482,7 @@ public class ServerCacheClient { } public boolean addServerCache(HTableInterface htable, byte[] key, final PTable cacheUsingTable, final byte[] cacheId, - final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState) + final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState, final boolean usePersistentCache) throws Exception { byte[] keyInRegion = getKeyInRegion(key); final Map<byte[], AddServerCacheResponse> results; @@ -483,6 +512,7 @@ public class ServerCacheClient { builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); } builder.setCacheId(ByteStringer.wrap(cacheId)); + builder.setUsePersistentCache(usePersistentCache); builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr)); builder.setHasProtoBufIndexMaintainer(true); ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory @@ -501,7 +531,6 @@ public class ServerCacheClient { } if (results != null && results.size() == 1) { return results.values().iterator().next().getReturn(); } return false; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java index c4e82c2..e37d4d4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java @@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager; public interface TenantCache { MemoryManager getMemoryManager(); Closeable getServerCache(ImmutableBytesPtr cacheId); - Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException; + Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, boolean usePersistentCache, int clientVersion) throws SQLException; void removeServerCache(ImmutableBytesPtr cacheId); void removeAllServerCache(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java index 1dc59bc..dc4c9e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java @@ -18,16 +18,22 @@ package org.apache.phoenix.cache; import java.io.Closeable; +import java.io.IOException; import java.sql.SQLException; +import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; +import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.memory.MemoryManager; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.util.Closeables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Ticker; import com.google.common.cache.Cache; @@ -44,18 +50,86 @@ import com.google.common.cache.RemovalNotification; * @since 0.1 */ public class TenantCacheImpl implements TenantCache { + private static final Logger logger = LoggerFactory.getLogger(TenantCacheImpl.class); private final int maxTimeToLiveMs; + private final int maxPersistenceTimeToLiveMs; private final MemoryManager memoryManager; private final Ticker ticker; - private volatile Cache<ImmutableBytesPtr, Closeable> serverCaches; - public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs) { - this(memoryManager, maxTimeToLiveMs, Ticker.systemTicker()); + // Two caches exist: the "serverCaches" cache which is used for handling live + // queries, and the "persistentServerCaches" cache which is used to store data + // between queries. If we are out of memory, attempt to clear out entries from + // the persistent cache before throwing an exception. + private volatile Cache<ImmutableBytesPtr, CacheEntry> serverCaches; + private volatile Cache<ImmutableBytesPtr, CacheEntry> persistentServerCaches; + + private final long EVICTION_MARGIN_BYTES = 10000000; + + private class CacheEntry implements Comparable<CacheEntry>, Closeable { + private ImmutableBytesPtr cacheId; + private ImmutableBytesWritable cachePtr; + private int hits; + private int liveQueriesCount; + private boolean usePersistentCache; + private long size; + private Closeable closeable; + + public CacheEntry(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, + ServerCacheFactory cacheFactory, byte[] txState, MemoryChunk chunk, + boolean usePersistentCache, boolean useProtoForIndexMaintainer, + int clientVersion) throws SQLException { + this.cacheId = cacheId; + this.cachePtr = cachePtr; + this.size = cachePtr.getLength(); + this.hits = 0; + this.liveQueriesCount = 0; + this.usePersistentCache = usePersistentCache; + this.closeable = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer, clientVersion); + } + + public void close() throws IOException { + this.closeable.close(); + } + + synchronized public void incrementLiveQueryCount() { + liveQueriesCount++; + hits++; + } + + synchronized public void decrementLiveQueryCount() { + liveQueriesCount--; + } + + synchronized public boolean isLive() { + return liveQueriesCount > 0; + } + + public boolean getUsePersistentCache() { + return usePersistentCache; + } + + public ImmutableBytesPtr getCacheId() { + return cacheId; + } + + private Float rank() { + return (float)hits; + } + + @Override + public int compareTo(CacheEntry o) { + return rank().compareTo(o.rank()); + } + } + + public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, int maxPersistenceTimeToLiveMs) { + this(memoryManager, maxTimeToLiveMs, maxPersistenceTimeToLiveMs, Ticker.systemTicker()); } - public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, Ticker ticker) { + public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, int maxPersistenceTimeToLiveMs, Ticker ticker) { this.memoryManager = memoryManager; this.maxTimeToLiveMs = maxTimeToLiveMs; + this.maxPersistenceTimeToLiveMs = maxPersistenceTimeToLiveMs; this.ticker = ticker; } @@ -69,6 +143,9 @@ public class TenantCacheImpl implements TenantCache { if (serverCaches != null) { serverCaches.cleanUp(); } + if (persistentServerCaches != null) { + persistentServerCaches.cleanUp(); + } } } @@ -77,57 +154,133 @@ public class TenantCacheImpl implements TenantCache { return memoryManager; } - private Cache<ImmutableBytesPtr,Closeable> getServerCaches() { + private Cache<ImmutableBytesPtr,CacheEntry> getServerCaches() { /* Delay creation of this map until it's needed */ if (serverCaches == null) { synchronized(this) { if (serverCaches == null) { - serverCaches = CacheBuilder.newBuilder() - .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS) - .ticker(getTicker()) - .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){ - @Override - public void onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) { - Closeables.closeAllQuietly(Collections.singletonList(notification.getValue())); - } - }) - .build(); + serverCaches = buildCache(maxTimeToLiveMs, false); } } } return serverCaches; } - - @Override + + private Cache<ImmutableBytesPtr,CacheEntry> getPersistentServerCaches() { + /* Delay creation of this map until it's needed */ + if (persistentServerCaches == null) { + synchronized(this) { + if (persistentServerCaches == null) { + persistentServerCaches = buildCache(maxPersistenceTimeToLiveMs, true); + } + } + } + return persistentServerCaches; + } + + private Cache<ImmutableBytesPtr, CacheEntry> buildCache(final int ttl, final boolean isPersistent) { + CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder(); + if (isPersistent) { + builder.expireAfterWrite(ttl, TimeUnit.MILLISECONDS); + } else { + builder.expireAfterAccess(ttl, TimeUnit.MILLISECONDS); + } + return builder + .ticker(getTicker()) + .removalListener(new RemovalListener<ImmutableBytesPtr, CacheEntry>(){ + @Override + public void onRemoval(RemovalNotification<ImmutableBytesPtr, CacheEntry> notification) { + if (isPersistent || !notification.getValue().getUsePersistentCache()) { + Closeables.closeAllQuietly(Collections.singletonList(notification.getValue())); + } + } + }) + .build(); + } + + synchronized private void evictInactiveEntries(long bytesNeeded) { + logger.debug("Trying to evict inactive cache entries to free up " + bytesNeeded + " bytes"); + CacheEntry[] entries = getPersistentServerCaches().asMap().values().toArray(new CacheEntry[]{}); + Arrays.sort(entries); + long available = this.getMemoryManager().getAvailableMemory(); + for (int i = 0; i < entries.length && available < bytesNeeded; i++) { + CacheEntry entry = entries[i]; + ImmutableBytesPtr cacheId = entry.getCacheId(); + getPersistentServerCaches().invalidate(cacheId); + available = this.getMemoryManager().getAvailableMemory(); + logger.debug("Evicted cache ID " + Bytes.toLong(cacheId.get()) + ", we now have " + available + " bytes available"); + } + } + + private CacheEntry getIfPresent(ImmutableBytesPtr cacheId) { + CacheEntry entry = getPersistentServerCaches().getIfPresent(cacheId); + if (entry != null) { + return entry; + } + return getServerCaches().getIfPresent(cacheId); + } + + @Override public Closeable getServerCache(ImmutableBytesPtr cacheId) { getServerCaches().cleanUp(); - return getServerCaches().getIfPresent(cacheId); + CacheEntry entry = getIfPresent(cacheId); + if (entry == null) { + return null; + } + return entry.closeable; } - + @Override - public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException { + public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, boolean usePersistentCache, int clientVersion) throws SQLException { getServerCaches().cleanUp(); - MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length); + long available = this.getMemoryManager().getAvailableMemory(); + int size = cachePtr.getLength() + txState.length; + if (size > available) { + evictInactiveEntries(size - available + EVICTION_MARGIN_BYTES); + } + MemoryChunk chunk = this.getMemoryManager().allocate(size); boolean success = false; try { - Closeable element = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer, clientVersion); - getServerCaches().put(cacheId, element); + CacheEntry entry; + synchronized(this) { + entry = getIfPresent(cacheId); + if (entry == null) { + entry = new CacheEntry( + cacheId, cachePtr, cacheFactory, txState, chunk, + usePersistentCache, useProtoForIndexMaintainer, + clientVersion); + getServerCaches().put(cacheId, entry); + if (usePersistentCache) { + getPersistentServerCaches().put(cacheId, entry); + } + } + entry.incrementLiveQueryCount(); + } success = true; - return element; + return entry; } finally { if (!success) { Closeables.closeAllQuietly(Collections.singletonList(chunk)); } - } + } } - + @Override - public void removeServerCache(ImmutableBytesPtr cacheId) { - getServerCaches().invalidate(cacheId); + synchronized public void removeServerCache(ImmutableBytesPtr cacheId) { + CacheEntry entry = getServerCaches().getIfPresent(cacheId); + if (entry == null) { + return; + } + entry.decrementLiveQueryCount(); + if (!entry.isLive()) { + logger.debug("Cache ID " + Bytes.toLong(cacheId.get()) + " is no longer live, invalidate it"); + getServerCaches().invalidate(cacheId); + } } @Override public void removeAllServerCache() { getServerCaches().invalidateAll(); + getPersistentServerCaches().invalidateAll(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 3e5f5ee..603da0b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -100,6 +100,7 @@ public class QueryCompiler { private final SequenceManager sequenceManager; private final boolean projectTuples; private final boolean noChildParentJoinOptimization; + private final boolean usePersistentCache; private final boolean optimizeSubquery; private final Map<TableRef, QueryPlan> dataPlans; private final boolean costBased; @@ -117,7 +118,8 @@ public class QueryCompiler { this.parallelIteratorFactory = parallelIteratorFactory; this.sequenceManager = sequenceManager; this.projectTuples = projectTuples; - this.noChildParentJoinOptimization = select.getHint().hasHint(Hint.NO_CHILD_PARENT_JOIN_OPTIMIZATION); + this.noChildParentJoinOptimization = select.getHint().hasHint(Hint.NO_CHILD_PARENT_JOIN_OPTIMIZATION) || select.getHint().hasHint(Hint.USE_PERSISTENT_CACHE); + this.usePersistentCache = select.getHint().hasHint(Hint.USE_PERSISTENT_CACHE); ConnectionQueryServices services = statement.getConnection().getQueryServices(); this.costBased = services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED); scan.setLoadColumnFamiliesOnDemand(true); @@ -314,7 +316,7 @@ public class QueryCompiler { if (i < count - 1) { fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size())); } - hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression); + hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), usePersistentCache, keyRangeLhsExpression, keyRangeRhsExpression); } TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true); @@ -381,9 +383,10 @@ public class QueryCompiler { HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[]{joinExpressions}, new JoinType[]{type == JoinType.Right ? JoinType.Left : type}, new boolean[]{true}, new PTable[]{lhsTable}, new int[]{fieldPosition}, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset)); + boolean usePersistentCache = joinTable.getStatement().getHint().hasHint(Hint.USE_PERSISTENT_CACHE); Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null); getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions); - return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())}); + return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, usePersistentCache, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())}); } case SORT_MERGE: { JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index eb195c2..cc38870 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -74,6 +74,7 @@ public class StatementContext { private final ImmutableBytesWritable tempPtr; private final PhoenixStatement statement; private final Map<PColumn, Integer> dataColumns; + private Map<Long, Boolean> retryingPersistentCache; private long currentTime = QueryConstants.UNSET_TIMESTAMP; private ScanRanges scanRanges = ScanRanges.EVERYTHING; @@ -138,6 +139,7 @@ public class StatementContext { this.subqueryResults = Maps.<SelectStatement, Object> newHashMap(); this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled,connection.getLogLevel()); this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled,connection.getLogLevel()); + this.retryingPersistentCache = Maps.<Long, Boolean> newHashMap(); } /** @@ -326,5 +328,22 @@ public class StatementContext { public void setClientSideUpsertSelect(boolean isClientSideUpsertSelect) { this.isClientSideUpsertSelect = isClientSideUpsertSelect; } - + + /* + * setRetryingPersistentCache can be used to override the USE_PERSISTENT_CACHE hint and disable the use of the + * persistent cache for a specific cache ID. This can be used to retry queries that failed when using the persistent + * cache. + */ + public void setRetryingPersistentCache(long cacheId) { + retryingPersistentCache.put(cacheId, true); + } + + public boolean getRetryingPersistentCache(long cacheId) { + Boolean retrying = retryingPersistentCache.get(cacheId); + if (retrying == null) { + return false; + } else { + return retrying; + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index d82aaba..96af154 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -51,7 +51,6 @@ import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; -import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TupleUtil; @@ -122,8 +121,7 @@ public class HashJoinRegionScanner implements RegionScanner { } HashCache hashCache = (HashCache)cache.getServerCache(joinId); if (hashCache == null) { - Exception cause = new HashJoinCacheNotFoundException( - Bytes.toLong(ByteUtil.copyKeyBytesIfNecessary(joinId))); + Exception cause = new HashJoinCacheNotFoundException(Bytes.toLong(joinId.get())); throw new DoNotRetryIOException(cause.getMessage(), cause); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java index 9d78659..86219c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java @@ -75,7 +75,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance(); tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()), cachePtr, txState, cacheFactory, request.hasHasProtoBufIndexMaintainer() && request.getHasProtoBufIndexMaintainer(), - request.hasClientVersion() ? request.getClientVersion() : ScanUtil.UNKNOWN_CLIENT_VERSION); + request.getUsePersistentCache(), request.hasClientVersion() ? request.getClientVersion() : ScanUtil.UNKNOWN_CLIENT_VERSION); } catch (Throwable e) { ProtobufUtil.setControllerException(controller, ServerUtil.createIOException("Error when adding cache: ", e)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java index fdca334..c42b9df 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java @@ -5760,6 +5760,16 @@ public final class ServerCachingProtos { * <code>optional int32 clientVersion = 7;</code> */ int getClientVersion(); + + // optional bool usePersistentCache = 8; + /** + * <code>optional bool usePersistentCache = 8;</code> + */ + boolean hasUsePersistentCache(); + /** + * <code>optional bool usePersistentCache = 8;</code> + */ + boolean getUsePersistentCache(); } /** * Protobuf type {@code AddServerCacheRequest} @@ -5863,6 +5873,11 @@ public final class ServerCachingProtos { clientVersion_ = input.readInt32(); break; } + case 64: { + bitField0_ |= 0x00000080; + usePersistentCache_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -6027,6 +6042,22 @@ public final class ServerCachingProtos { return clientVersion_; } + // optional bool usePersistentCache = 8; + public static final int USEPERSISTENTCACHE_FIELD_NUMBER = 8; + private boolean usePersistentCache_; + /** + * <code>optional bool usePersistentCache = 8;</code> + */ + public boolean hasUsePersistentCache() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * <code>optional bool usePersistentCache = 8;</code> + */ + public boolean getUsePersistentCache() { + return usePersistentCache_; + } + private void initFields() { tenantId_ = com.google.protobuf.ByteString.EMPTY; cacheId_ = com.google.protobuf.ByteString.EMPTY; @@ -6035,6 +6066,7 @@ public final class ServerCachingProtos { txState_ = com.google.protobuf.ByteString.EMPTY; hasProtoBufIndexMaintainer_ = false; clientVersion_ = 0; + usePersistentCache_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -6089,6 +6121,9 @@ public final class ServerCachingProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeInt32(7, clientVersion_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeBool(8, usePersistentCache_); + } getUnknownFields().writeTo(output); } @@ -6126,6 +6161,10 @@ public final class ServerCachingProtos { size += com.google.protobuf.CodedOutputStream .computeInt32Size(7, clientVersion_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(8, usePersistentCache_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -6184,6 +6223,11 @@ public final class ServerCachingProtos { result = result && (getClientVersion() == other.getClientVersion()); } + result = result && (hasUsePersistentCache() == other.hasUsePersistentCache()); + if (hasUsePersistentCache()) { + result = result && (getUsePersistentCache() + == other.getUsePersistentCache()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -6225,6 +6269,10 @@ public final class ServerCachingProtos { hash = (37 * hash) + CLIENTVERSION_FIELD_NUMBER; hash = (53 * hash) + getClientVersion(); } + if (hasUsePersistentCache()) { + hash = (37 * hash) + USEPERSISTENTCACHE_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getUsePersistentCache()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -6358,6 +6406,8 @@ public final class ServerCachingProtos { bitField0_ = (bitField0_ & ~0x00000020); clientVersion_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + usePersistentCache_ = false; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -6422,6 +6472,10 @@ public final class ServerCachingProtos { to_bitField0_ |= 0x00000040; } result.clientVersion_ = clientVersion_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000080; + } + result.usePersistentCache_ = usePersistentCache_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -6459,6 +6513,9 @@ public final class ServerCachingProtos { if (other.hasClientVersion()) { setClientVersion(other.getClientVersion()); } + if (other.hasUsePersistentCache()) { + setUsePersistentCache(other.getUsePersistentCache()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6914,6 +6971,39 @@ public final class ServerCachingProtos { return this; } + // optional bool usePersistentCache = 8; + private boolean usePersistentCache_ ; + /** + * <code>optional bool usePersistentCache = 8;</code> + */ + public boolean hasUsePersistentCache() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * <code>optional bool usePersistentCache = 8;</code> + */ + public boolean getUsePersistentCache() { + return usePersistentCache_; + } + /** + * <code>optional bool usePersistentCache = 8;</code> + */ + public Builder setUsePersistentCache(boolean value) { + bitField0_ |= 0x00000080; + usePersistentCache_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool usePersistentCache = 8;</code> + */ + public Builder clearUsePersistentCache() { + bitField0_ = (bitField0_ & ~0x00000080); + usePersistentCache_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:AddServerCacheRequest) } @@ -8723,22 +8813,23 @@ public final class ServerCachingProtos { "\timmutable\030\022 \002(\010\022&\n\021indexedColumnInfo\030\023 " + "\003(\0132\013.ColumnInfo\022\026\n\016encodingScheme\030\024 \002(\005" + "\022\036\n\026immutableStorageScheme\030\025 \002(\005\022\025\n\rview" + - "IndexType\030\026 \001(\005\"\334\001\n\025AddServerCacheReques" + + "IndexType\030\026 \001(\005\"\370\001\n\025AddServerCacheReques" + "t\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\022)\n\010" + "cachePtr\030\003 \002(\0132\027.ImmutableBytesWritable\022" + ")\n\014cacheFactory\030\004 \002(\0132\023.ServerCacheFacto" + "ry\022\017\n\007txState\030\005 \001(\014\022\"\n\032hasProtoBufIndexM" + - "aintainer\030\006 \001(\010\022\025\n\rclientVersion\030\007 \001(\005\"(", - "\n\026AddServerCacheResponse\022\016\n\006return\030\001 \002(\010" + - "\"=\n\030RemoveServerCacheRequest\022\020\n\010tenantId" + - "\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\"+\n\031RemoveServerC" + - "acheResponse\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerC" + - "achingService\022A\n\016addServerCache\022\026.AddSer" + - "verCacheRequest\032\027.AddServerCacheResponse" + - "\022J\n\021removeServerCache\022\031.RemoveServerCach" + - "eRequest\032\032.RemoveServerCacheResponseBG\n(" + - "org.apache.phoenix.coprocessor.generated" + - "B\023ServerCachingProtosH\001\210\001\001\240\001\001" + "aintainer\030\006 \001(\010\022\025\n\rclientVersion\030\007 \001(\005\022\032", + "\n\022usePersistentCache\030\010 \001(\010\"(\n\026AddServerC" + + "acheResponse\022\016\n\006return\030\001 \002(\010\"=\n\030RemoveSe" + + "rverCacheRequest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007ca" + + "cheId\030\002 \002(\014\"+\n\031RemoveServerCacheResponse" + + "\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerCachingServic" + + "e\022A\n\016addServerCache\022\026.AddServerCacheRequ" + + "est\032\027.AddServerCacheResponse\022J\n\021removeSe" + + "rverCache\022\031.RemoveServerCacheRequest\032\032.R" + + "emoveServerCacheResponseBG\n(org.apache.p" + + "hoenix.coprocessor.generatedB\023ServerCach", + "ingProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -8774,7 +8865,7 @@ public final class ServerCachingProtos { internal_static_AddServerCacheRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AddServerCacheRequest_descriptor, - new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", "HasProtoBufIndexMaintainer", "ClientVersion", }); + new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", "HasProtoBufIndexMaintainer", "ClientVersion", "UsePersistentCache", }); internal_static_AddServerCacheResponse_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_AddServerCacheResponse_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index bfe089d..b5cd6b1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -22,21 +22,23 @@ import static org.apache.phoenix.util.LogUtil.addCustomAnnotations; import static org.apache.phoenix.util.NumberUtil.add; import static org.apache.phoenix.util.NumberUtil.getMin; +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.sql.SQLException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.codec.binary.Hex; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.ExplainPlan; @@ -46,6 +48,7 @@ import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.WhereCompiler; +import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor; @@ -57,9 +60,7 @@ import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.RowValueConstructorExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.iterate.FilterResultIterator; -import org.apache.phoenix.iterate.ParallelScanGrouper; -import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.*; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.join.HashCacheClient; @@ -86,9 +87,11 @@ import org.apache.phoenix.util.SQLCloseables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.phoenix.util.ServerUtil; public class HashJoinPlan extends DelegateQueryPlan { private static final Log LOG = LogFactory.getLog(HashJoinPlan.class); + private static final Random RANDOM = new Random(); private final SelectStatement statement; private final HashJoinInfo joinInfo; @@ -105,6 +108,7 @@ public class HashJoinPlan extends DelegateQueryPlan { private Long estimatedBytes; private Long estimateInfoTs; private boolean getEstimatesCalled; + private boolean hasSubPlansWithPersistentCache; public static HashJoinPlan create(SelectStatement statement, QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) throws SQLException { @@ -134,8 +138,12 @@ public class HashJoinPlan extends DelegateQueryPlan { this.recompileWhereClause = recompileWhereClause; this.tableRefs = Sets.newHashSetWithExpectedSize(subPlans.length + plan.getSourceRefs().size()); this.tableRefs.addAll(plan.getSourceRefs()); + this.hasSubPlansWithPersistentCache = false; for (SubPlan subPlan : subPlans) { tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs()); + if (subPlan instanceof HashSubPlan && ((HashSubPlan)subPlan).usePersistentCache) { + this.hasSubPlansWithPersistentCache = true; + } } QueryServices services = plan.getContext().getConnection().getQueryServices(); this.maxServerCacheTimeToLive = services.getProps().getInt( @@ -214,7 +222,7 @@ public class HashJoinPlan extends DelegateQueryPlan { SQLCloseables.closeAllQuietly(dependencies.values()); throw firstException; } - + Expression postFilter = null; boolean hasKeyRangeExpressions = keyRangeExpressions != null && !keyRangeExpressions.isEmpty(); if (recompileWhereClause || hasKeyRangeExpressions) { @@ -241,8 +249,35 @@ public class HashJoinPlan extends DelegateQueryPlan { if (statement.getInnerSelectStatement() != null && postFilter != null) { iterator = new FilterResultIterator(iterator, postFilter); } - - return iterator; + + if (hasSubPlansWithPersistentCache) { + return peekForPersistentCache(iterator, scanGrouper, scan); + } else { + return iterator; + } + } + + private ResultIterator peekForPersistentCache(ResultIterator iterator, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + // The persistent subquery is optimistic and assumes caches are present on region + // servers. We verify that this is the case by peeking at one result. If there is + // a cache missing exception, we retry the query with the persistent cache disabled + // for that specific cache ID. + PeekingResultIterator peeking = LookAheadResultIterator.wrap(iterator); + try { + peeking.peek(); + } catch (Exception e) { + try { + throw ServerUtil.parseServerException(e); + } catch (HashJoinCacheNotFoundException e2) { + Long cacheId = e2.getCacheId(); + if (delegate.getContext().getRetryingPersistentCache(cacheId)) { + throw e2; + } + delegate.getContext().setRetryingPersistentCache(cacheId); + return iterator(scanGrouper, scan); + } + } + return peeking; } private Expression createKeyRangeExpression(Expression lhsExpression, @@ -467,20 +502,29 @@ public class HashJoinPlan extends DelegateQueryPlan { private final QueryPlan plan; private final List<Expression> hashExpressions; private final boolean singleValueOnly; + private final boolean usePersistentCache; private final Expression keyRangeLhsExpression; private final Expression keyRangeRhsExpression; + private final MessageDigest digest; public HashSubPlan(int index, QueryPlan subPlan, List<Expression> hashExpressions, boolean singleValueOnly, + boolean usePersistentCache, Expression keyRangeLhsExpression, Expression keyRangeRhsExpression) { this.index = index; this.plan = subPlan; this.hashExpressions = hashExpressions; this.singleValueOnly = singleValueOnly; + this.usePersistentCache = usePersistentCache; this.keyRangeLhsExpression = keyRangeLhsExpression; this.keyRangeRhsExpression = keyRangeRhsExpression; + try { + this.digest = MessageDigest.getInstance("SHA-256"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } } @Override @@ -494,19 +538,37 @@ public class HashJoinPlan extends DelegateQueryPlan { if (hashExpressions != null) { ResultIterator iterator = plan.iterator(); try { - cache = - parent.hashClient.addHashCache(ranges, iterator, - plan.getEstimatedSize(), hashExpressions, singleValueOnly, + final byte[] cacheId; + String queryString = plan.getStatement().toString().replaceAll("\\$[0-9]+", "\\$"); + if (usePersistentCache) { + cacheId = Arrays.copyOfRange(digest.digest(queryString.getBytes()), 0, 8); + boolean retrying = parent.delegate.getContext().getRetryingPersistentCache(Bytes.toLong(cacheId)); + if (!retrying) { + try { + cache = parent.hashClient.createServerCache(cacheId, parent.delegate); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } else { + cacheId = Bytes.toBytes(RANDOM.nextLong()); + } + LOG.debug("Using cache ID " + Hex.encodeHexString(cacheId) + " for " + queryString); + if (cache == null) { + LOG.debug("Making RPC to add cache " + Hex.encodeHexString(cacheId)); + cache = parent.hashClient.addHashCache(ranges, cacheId, iterator, + plan.getEstimatedSize(), hashExpressions, singleValueOnly, usePersistentCache, parent.delegate.getTableRef().getTable(), keyRangeRhsExpression, keyRangeRhsValues); - long endTime = System.currentTimeMillis(); - boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime); - if (!isSet && (endTime - - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) { - LOG.warn(addCustomAnnotations( - "Hash plan [" + index - + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.", - parent.delegate.getContext().getConnection())); + long endTime = System.currentTimeMillis(); + boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime); + if (!isSet && (endTime + - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) { + LOG.warn(addCustomAnnotations( + "Hash plan [" + index + + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.", + parent.delegate.getContext().getConnection())); + } } } finally { iterator.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index d890383..2378175 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -1309,8 +1309,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result throw e2; } Long cacheId = ((HashJoinCacheNotFoundException)e2).getCacheId(); - if (!hashCacheClient.addHashCacheToServer(startKey, - caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))), plan.getTableRef().getTable())) { throw e2; } + ServerCache cache = caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))); + if (cache .getCachePtr() != null) { + if (!hashCacheClient.addHashCacheToServer(startKey, cache, plan.getTableRef().getTable())) { + throw e2; + } + } } concatIterators = recreateIterators(services, isLocalIndex, allIterators, http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 06f612a..f1d1663 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -194,13 +194,11 @@ public class TableResultIterator implements ResultIterator { if (retry <= 0) { throw e1; } + Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); retry--; try { - Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); - ServerCache cache = caches == null ? null : - caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))); - + caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))); if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(), cache, plan.getTableRef().getTable())) { throw e1; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java index 83ac32d..315c515 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; @@ -67,6 +68,20 @@ public class HashCacheClient { } /** + * Creates a ServerCache object for cacheId. This is used for persistent cache, and there may or may not + * be corresponding data on each region server. + * @param cacheId ID for the cache entry + * @param delegate the query plan this will be used for + * @return client-side {@link ServerCache} representing the hash cache that may or may not be present on region servers. + * @throws SQLException + * size + */ + public ServerCache createServerCache(final byte[] cacheId, QueryPlan delegate) + throws SQLException, IOException { + return serverCache.createServerCache(cacheId, delegate); + } + + /** * Send the results of scanning through the scanner to all * region servers for regions of the table that will use the cache * that intersect with the minMaxKeyRange. @@ -76,13 +91,16 @@ public class HashCacheClient { * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed * size */ - public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, PTable cacheUsingTable, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException { + public ServerCache addHashCache( + ScanRanges keyRanges, byte[] cacheId, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, + boolean singleValueOnly, boolean usePersistentCache, PTable cacheUsingTable, Expression keyRangeRhsExpression, + List<Expression> keyRangeRhsValues) throws SQLException { /** * Serialize and compress hashCacheTable */ ImmutableBytesWritable ptr = new ImmutableBytesWritable(); serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues); - ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTable, true); + ServerCache cache = serverCache.addServerCache(keyRanges, cacheId, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTable, usePersistentCache, true); return cache; } @@ -90,7 +108,7 @@ public class HashCacheClient { * Should only be used to resend the hash table cache to the regionserver. * * @param startkeyOfRegion start key of any region hosted on a regionserver which needs hash cache - * @param cacheId Id of the cache which needs to be sent + * @param cache The cache which needs to be sent * @param pTable * @return * @throws Exception http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java index 4fc3c70..ecf9d57 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import net.jcip.annotations.Immutable; @@ -139,6 +140,16 @@ public class HashCacheFactory implements ServerCacheFactory { } @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + Set<ImmutableBytesPtr> keySet = hashCache.keySet(); + for (ImmutableBytesPtr key : keySet) { + sb.append("key: " + key + " value: " + hashCache.get(key)); + } + return sb.toString(); + } + + @Override public void close() { memoryChunk.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java index 02a44ad..8a83116 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java @@ -83,6 +83,10 @@ public class HintNode { */ USE_SORT_MERGE_JOIN, /** + * Persist the RHS results of a hash join. + */ + USE_PERSISTENT_CACHE, + /** * Avoid using star-join optimization. Used for broadcast join (hash join) only. */ NO_STAR_JOIN, http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index d681a13..d1b277a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -90,6 +90,7 @@ public interface QueryServices extends SQLCloseable { public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize"; public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes"; public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs"; + public static final String MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCachePersistenceTimeToLiveMs"; @Deprecated // Use FORCE_ROW_KEY_ORDER instead. public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB = "phoenix.query.rowKeyOrderSaltedTable"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 76e79fa..35dbe3a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -159,6 +159,7 @@ public class QueryServicesOptions { public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 2097152; // The only downside of it being out-of-sync is that the parallelization of the scan won't be as balanced as it could be. public static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 30000; // 30 sec (with no activity) + public static final int DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS = 30 * 60000; // 30 minutes public static final int DEFAULT_SCAN_CACHE_SIZE = 1000; public static final int DEFAULT_MAX_INTRA_REGION_PARALLELIZATION = DEFAULT_MAX_QUERY_CONCURRENCY; public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java index c741f4e..3c8a269 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java @@ -17,9 +17,6 @@ */ package org.apache.phoenix.cache; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - import java.io.Closeable; import java.io.DataInput; import java.io.DataOutput; @@ -38,49 +35,53 @@ import org.junit.Test; import com.google.common.base.Ticker; +import static org.junit.Assert.*; + public class TenantCacheTest { @Test public void testInvalidateClosesMemoryChunk() throws SQLException { int maxServerCacheTimeToLive = 10000; + int maxServerCachePersistenceTimeToLive = 10; long maxBytes = 1000; GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes); - TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive); - ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a")); + TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive); + ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes(1L)); ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); - newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION); + newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION); assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); newTenantCache.removeServerCache(cacheId); assertEquals(maxBytes, memoryManager.getAvailableMemory()); } - + @Test public void testTimeoutClosesMemoryChunk() throws Exception { int maxServerCacheTimeToLive = 10; + int maxServerCachePersistenceTimeToLive = 10; long maxBytes = 1000; GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes); ManualTicker ticker = new ManualTicker(); - TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker); - ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a")); + TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker); + ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L)); ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); - cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION); assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); ticker.time += (maxServerCacheTimeToLive + 1) * 1000000; cache.cleanUp(); assertEquals(maxBytes, memoryManager.getAvailableMemory()); } - @Test public void testFreeMemoryOnAccess() throws Exception { int maxServerCacheTimeToLive = 10; + int maxServerCachePersistenceTimeToLive = 10; long maxBytes = 1000; GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes); ManualTicker ticker = new ManualTicker(); - TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker); - ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a")); + TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker); + ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L)); ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); - cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION); assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); ticker.time += (maxServerCacheTimeToLive + 1) * 1000000; assertNull(cache.getServerCache(cacheId1)); @@ -90,17 +91,92 @@ public class TenantCacheTest { @Test public void testExpiredCacheOnAddingNew() throws Exception { int maxServerCacheTimeToLive = 10; + int maxServerCachePersistenceTimeToLive = 10; long maxBytes = 10; GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes); ManualTicker ticker = new ManualTicker(); - TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker); - ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a")); + TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker); + ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L)); ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("12345678")); - cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION); assertEquals(2, memoryManager.getAvailableMemory()); ticker.time += (maxServerCacheTimeToLive + 1) * 1000000; - cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, false, MetaDataProtocol.PHOENIX_VERSION); + assertEquals(2, memoryManager.getAvailableMemory()); + } + + @Test + public void testExpiresButStaysInPersistentAfterTimeout() throws Exception { + int maxServerCacheTimeToLive = 100; + int maxServerCachePersistenceTimeToLive = 1000; + long maxBytes = 1000; + GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes); + ManualTicker ticker = new ManualTicker(); + TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker); + ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L)); + ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION); + assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); + assertNotNull(cache.getServerCache(cacheId1)); + + // Expire it from live cache but not persistent cache + ticker.time += (maxServerCacheTimeToLive + 1) * 1000000; + cache.cleanUp(); + assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); + assertNotNull(cache.getServerCache(cacheId1)); + + // Expire it from persistent cache as well + ticker.time += (maxServerCachePersistenceTimeToLive + 1) * 1000000; + cache.cleanUp(); + assertEquals(maxBytes, memoryManager.getAvailableMemory()); + assertNull(cache.getServerCache(cacheId1)); + } + + @Test + public void testExpiresButStaysInPersistentAfterRemove() throws Exception { + int maxServerCacheTimeToLive = 100; + int maxServerCachePersistenceTimeToLive = 1000; + long maxBytes = 1000; + GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes); + ManualTicker ticker = new ManualTicker(); + TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker); + ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L)); + ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("12")); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION); + assertEquals(maxBytes-2, memoryManager.getAvailableMemory()); + assertNotNull(cache.getServerCache(cacheId1)); + + // Remove should only remove from live cache + cache.removeServerCache(cacheId1); + assertEquals(maxBytes-2, memoryManager.getAvailableMemory()); + assertNotNull(cache.getServerCache(cacheId1)); + } + + @Test + public void testEvictPersistentCacheIfSpaceIsNeeded() throws Exception { + int maxServerCacheTimeToLive = 100; + int maxServerCachePersistenceTimeToLive = 1000; + long maxBytes = 10; + GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes); + ManualTicker ticker = new ManualTicker(); + TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, maxServerCachePersistenceTimeToLive, ticker); + ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes(1L)); + ImmutableBytesWritable cachePtr1 = new ImmutableBytesWritable(Bytes.toBytes("1234")); + cache.addServerCache(cacheId1, cachePtr1, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION); + assertEquals(6, memoryManager.getAvailableMemory()); + + // Remove it, but it should stay in persistent cache + cache.removeServerCache(cacheId1); + assertNotNull(cache.getServerCache(cacheId1)); + assertEquals(6, memoryManager.getAvailableMemory()); + + // Let's do an entry that will require eviction + ImmutableBytesPtr cacheId2 = new ImmutableBytesPtr(Bytes.toBytes(2L)); + ImmutableBytesWritable cachePtr2 = new ImmutableBytesWritable(Bytes.toBytes("12345678")); + cache.addServerCache(cacheId2, cachePtr2, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, true, MetaDataProtocol.PHOENIX_VERSION); assertEquals(2, memoryManager.getAvailableMemory()); + assertNull(cache.getServerCache(cacheId1)); + assertNotNull(cache.getServerCache(cacheId2)); } public static class ManualTicker extends Ticker { http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb697933/phoenix-protocol/src/main/ServerCachingService.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto index d92f2cd..0d2d1d2 100644 --- a/phoenix-protocol/src/main/ServerCachingService.proto +++ b/phoenix-protocol/src/main/ServerCachingService.proto @@ -73,6 +73,7 @@ message AddServerCacheRequest { optional bytes txState = 5; optional bool hasProtoBufIndexMaintainer = 6; optional int32 clientVersion = 7; + optional bool usePersistentCache = 8; } message AddServerCacheResponse {