This is an automated email from the ASF dual-hosted git repository. karanmehta93 pushed a commit to branch 4.x-HBase-1.4 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push: new ffbfc80 PHOENIX-5069 Use asynchronous refresh to provide non-blocking Phoenix Stats Client Cache ffbfc80 is described below commit ffbfc8012027fa399bdba9ee4ab1074fc30229cb Author: Bin <b...@salesforce.com> AuthorDate: Tue Feb 5 15:37:21 2019 -0800 PHOENIX-5069 Use asynchronous refresh to provide non-blocking Phoenix Stats Client Cache --- .../org/apache/phoenix/query/GuidePostsCache.java | 86 ++++++++---- .../phoenix/query/PhoenixStatsCacheLoader.java | 90 ++++++++++++ .../apache/phoenix/query/PhoenixStatsLoader.java | 57 ++++++++ .../org/apache/phoenix/query/QueryServices.java | 4 +- .../apache/phoenix/query/QueryServicesOptions.java | 7 + .../phoenix/schema/stats/GuidePostsInfo.java | 6 +- .../phoenix/query/PhoenixStatsCacheLoaderTest.java | 156 +++++++++++++++++++++ 7 files changed, 378 insertions(+), 28 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/GuidePostsCache.java b/phoenix-core/src/main/java/org/apache/phoenix/query/GuidePostsCache.java index 066dd5f..2c2697a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/GuidePostsCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/GuidePostsCache.java @@ -22,12 +22,12 @@ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_STATS_COLLEC import java.io.IOException; import java.util.List; import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; @@ -42,13 +42,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalCause; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.google.common.cache.Weigher; + /** * "Client-side" cache for storing {@link GuidePostsInfo} for a column family. Intended to decouple * Phoenix from a specific version of Guava's cache. @@ -58,21 +58,29 @@ public class GuidePostsCache { private final ConnectionQueryServices queryServices; private final LoadingCache<GuidePostsKey, GuidePostsInfo> cache; + private ExecutorService executor = null; public GuidePostsCache(ConnectionQueryServices queryServices, Configuration config) { this.queryServices = Objects.requireNonNull(queryServices); + // Number of millis to expire cache values after write final long statsUpdateFrequency = config.getLong( QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS); - // Maximum number of entries (tables) to store in the cache at one time + + // Maximum total weight (size in bytes) of stats entries final long maxTableStatsCacheSize = config.getLong( QueryServices.STATS_MAX_CACHE_SIZE, QueryServicesOptions.DEFAULT_STATS_MAX_CACHE_SIZE); + final boolean isStatsEnabled = config.getBoolean(STATS_COLLECTION_ENABLED, DEFAULT_STATS_COLLECTION_ENABLED); + + PhoenixStatsCacheLoader cacheLoader = new PhoenixStatsCacheLoader( + isStatsEnabled ? new StatsLoaderImpl() : new EmptyStatsLoader(), config); + cache = CacheBuilder.newBuilder() - // Expire entries a given amount of time after they were written - .expireAfterWrite(statsUpdateFrequency, TimeUnit.MILLISECONDS) + // Refresh entries a given amount of time after they were written + .refreshAfterWrite(statsUpdateFrequency, TimeUnit.MILLISECONDS) // Maximum total weight (size in bytes) of stats entries .maximumWeight(maxTableStatsCacheSize) // Defer actual size to the PTableStats.getEstimatedSize() @@ -84,18 +92,37 @@ public class GuidePostsCache { // Log removals at TRACE for debugging .removalListener(new PhoenixStatsCacheRemovalListener()) // Automatically load the cache when entries are missing - .build(isStatsEnabled ? new StatsLoader() : new EmptyStatsLoader()); + .build(cacheLoader); } /** - * {@link CacheLoader} implementation for the Phoenix Table Stats cache. + * {@link PhoenixStatsLoader} implementation for the Stats Loader. */ - protected class StatsLoader extends CacheLoader<GuidePostsKey, GuidePostsInfo> { + protected class StatsLoaderImpl implements PhoenixStatsLoader { + @Override + public boolean needsLoad() { + // For now, whenever it's called, we try to load stats from stats table + // no matter it has been updated or not. + // Here are the possible optimizations we can do here: + // 1. Load stats from the stats table only when the stats get updated on the server side. + // 2. Support different refresh cycle for different tables. + return true; + } + + @Override + public GuidePostsInfo loadStats(GuidePostsKey statsKey) throws Exception { + return loadStats(statsKey, GuidePostsInfo.NO_GUIDEPOST); + } + @Override - public GuidePostsInfo load(GuidePostsKey statsKey) throws Exception { - Table statsHTable = queryServices.getTable(SchemaUtil.getPhysicalName( + public GuidePostsInfo loadStats(GuidePostsKey statsKey, GuidePostsInfo prevGuidepostInfo) throws Exception { + assert(prevGuidepostInfo != null); + + TableName tableName = SchemaUtil.getPhysicalName( PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, - queryServices.getProps()).getName()); + queryServices.getProps()); + Table statsHTable = queryServices.getTable(tableName.getName()); + try { GuidePostsInfo guidePostsInfo = StatisticsUtil.readStatistics(statsHTable, statsKey, HConstants.LATEST_TIMESTAMP); @@ -103,18 +130,17 @@ public class GuidePostsCache { return guidePostsInfo; } catch (TableNotFoundException e) { // On a fresh install, stats might not yet be created, don't warn about this. - logger.debug("Unable to locate Phoenix stats table", e); - return GuidePostsInfo.NO_GUIDEPOST; + logger.debug("Unable to locate Phoenix stats table: " + tableName.toString(), e); + return prevGuidepostInfo; } catch (IOException e) { - logger.warn("Unable to read from stats table", e); - // Just cache empty stats. We'll try again after some time anyway. - return GuidePostsInfo.NO_GUIDEPOST; + logger.warn("Unable to read from stats table: " + tableName.toString(), e); + return prevGuidepostInfo; } finally { try { statsHTable.close(); } catch (IOException e) { // Log, but continue. We have our stats anyway now. - logger.warn("Unable to close stats table", e); + logger.warn("Unable to close stats table: " + tableName.toString(), e); } } } @@ -125,20 +151,30 @@ public class GuidePostsCache { void traceStatsUpdate(GuidePostsKey key, GuidePostsInfo info) { if (logger.isTraceEnabled()) { logger.trace("Updating local TableStats cache (id={}) for {}, size={}bytes", - new Object[] {Objects.hashCode(GuidePostsCache.this), key, - info.getEstimatedSize()}); + new Object[] {Objects.hashCode(GuidePostsCache.this), key, info.getEstimatedSize()}); } } } /** + * {@link PhoenixStatsLoader} implementation for the Stats Loader. * Empty stats loader if stats are disabled */ - protected class EmptyStatsLoader extends CacheLoader<GuidePostsKey, GuidePostsInfo> { - @Override - public GuidePostsInfo load(GuidePostsKey statsKey) throws Exception { - return GuidePostsInfo.NO_GUIDEPOST; - } + protected class EmptyStatsLoader implements PhoenixStatsLoader { + @Override + public boolean needsLoad() { + return false; + } + + @Override + public GuidePostsInfo loadStats(GuidePostsKey statsKey) throws Exception { + return GuidePostsInfo.NO_GUIDEPOST; + } + + @Override + public GuidePostsInfo loadStats(GuidePostsKey statsKey, GuidePostsInfo prevGuidepostInfo) throws Exception { + return GuidePostsInfo.NO_GUIDEPOST; + } } /** diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsCacheLoader.java b/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsCacheLoader.java new file mode 100644 index 0000000..98e9587 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsCacheLoader.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.query; + +import com.google.common.cache.CacheLoader; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.schema.stats.GuidePostsInfo; +import org.apache.phoenix.schema.stats.GuidePostsKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * {@link CacheLoader} implementation for the Phoenix Table Stats cache. + */ +public class PhoenixStatsCacheLoader extends CacheLoader<GuidePostsKey, GuidePostsInfo> { + private static final Logger logger = LoggerFactory.getLogger(PhoenixStatsCacheLoader.class); + + final private PhoenixStatsLoader statsLoader; + private static volatile ExecutorService executor; + + public PhoenixStatsCacheLoader(PhoenixStatsLoader statsLoader, Configuration config) { + this.statsLoader = statsLoader; + + if (this.executor == null) { + synchronized (PhoenixStatsCacheLoader.class) { + if (this.executor == null) { + // The size of the thread pool used for refreshing cached table stats + final int statsCacheThreadPoolSize = config.getInt( + QueryServices.STATS_CACHE_THREAD_POOL_SIZE, + QueryServicesOptions.DEFAULT_STATS_CACHE_THREAD_POOL_SIZE); + + this.executor = Executors.newFixedThreadPool(statsCacheThreadPoolSize); + } + } + } + } + + @Override + public GuidePostsInfo load(GuidePostsKey statsKey) throws Exception { + return statsLoader.loadStats(statsKey); + } + + @Override + public ListenableFuture<GuidePostsInfo> reload( + final GuidePostsKey key, + final GuidePostsInfo prevGuidepostInfo) + { + if (statsLoader.needsLoad()) { + // schedule asynchronous task + ListenableFutureTask<GuidePostsInfo> task = + ListenableFutureTask.create(new Callable<GuidePostsInfo>() { + public GuidePostsInfo call() { + try { + return statsLoader.loadStats(key, prevGuidepostInfo); + } catch (Exception e) { + logger.warn("Unable to load stats from table: " + key.toString(), e); + return prevGuidepostInfo; + } + } + }); + executor.execute(task); + return task; + } + else { + return Futures.immediateFuture(prevGuidepostInfo); + } + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsLoader.java b/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsLoader.java new file mode 100644 index 0000000..eda5e56 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsLoader.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.query; + +import org.apache.phoenix.schema.stats.GuidePostsInfo; +import org.apache.phoenix.schema.stats.GuidePostsKey; + +/** + * The interface for talking to underneath layers to load stats from stats table for a given key + */ +public interface PhoenixStatsLoader { + /** + * Use to check whether this is the time to load stats from stats table. + * There are two cases: + * a. After a specified duration has passed + * b. The stats on server side (e.g. in stats table) has been updated + * + * @return boolean indicates whether we need to load stats or not + */ + boolean needsLoad(); + + /** + * Called by client stats cache to load stats from underneath layers + * + * @param statsKey the stats key used to search the stats on server side (in stats table) + * @throws Exception + * + * @return GuidePostsInfo retrieved from sever side + */ + GuidePostsInfo loadStats(GuidePostsKey statsKey) throws Exception; + + /** + * Called by client stats cache to load stats from underneath layers + * + * @param statsKey the stats key used to search the stats on server side (in stats table) + * @param prevGuidepostInfo the existing stats cached on the client side or GuidePostsInfo.NO_GUIDEPOST + * @throws Exception + * + * @return GuidePostsInfo retrieved from sever side + */ + GuidePostsInfo loadStats(GuidePostsKey statsKey, GuidePostsInfo prevGuidepostInfo) throws Exception; +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index e279f05..fc11539 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -197,8 +197,10 @@ public interface QueryServices extends SQLCloseable { public static final String COMMIT_STATS_ASYNC = "phoenix.stats.commit.async"; // Maximum size in bytes taken up by cached table stats in the client public static final String STATS_MAX_CACHE_SIZE = "phoenix.stats.cache.maxSize"; - public static final String LOG_SALT_BUCKETS_ATTRIB = "phoenix.log.saltBuckets"; + // The size of the thread pool used for refreshing cached table stats in stats client cache + public static final String STATS_CACHE_THREAD_POOL_SIZE = "phoenix.stats.cache.threadPoolSize"; + public static final String LOG_SALT_BUCKETS_ATTRIB = "phoenix.log.saltBuckets"; public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets"; public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority"; public static final String EXPLAIN_CHUNK_COUNT_ATTRIB = "phoenix.explain.displayChunkCount"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 4407530..fcf57c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -86,6 +86,7 @@ import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRI import static org.apache.phoenix.query.QueryServices.STATS_COLLECTION_ENABLED; import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB; +import static org.apache.phoenix.query.QueryServices.STATS_CACHE_THREAD_POOL_SIZE; import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB; @@ -247,6 +248,7 @@ public class QueryServicesOptions { public static final long DEFAULT_STATS_MAX_CACHE_SIZE = 256 * 1024 * 1024; // Allow stats collection to be initiated by client multiple times immediately public static final int DEFAULT_MIN_STATS_UPDATE_FREQ_MS = 0; + public static final int DEFAULT_STATS_CACHE_THREAD_POOL_SIZE = 4; public static final boolean DEFAULT_USE_REVERSE_SCAN = true; @@ -414,6 +416,7 @@ public class QueryServicesOptions { .setIfUnset(DATE_FORMAT_TIMEZONE_ATTRIB, DEFAULT_DATE_FORMAT_TIMEZONE) .setIfUnset(STATS_UPDATE_FREQ_MS_ATTRIB, DEFAULT_STATS_UPDATE_FREQ_MS) .setIfUnset(MIN_STATS_UPDATE_FREQ_MS_ATTRIB, DEFAULT_MIN_STATS_UPDATE_FREQ_MS) + .setIfUnset(STATS_CACHE_THREAD_POOL_SIZE, DEFAULT_STATS_CACHE_THREAD_POOL_SIZE) .setIfUnset(CALL_QUEUE_ROUND_ROBIN_ATTRIB, DEFAULT_CALL_QUEUE_ROUND_ROBIN) .setIfUnset(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE) .setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER) @@ -736,6 +739,10 @@ public class QueryServicesOptions { return set(MIN_STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs); } + public QueryServicesOptions setStatsCacheThreadPoolSize(int threadPoolSize) { + return set(STATS_CACHE_THREAD_POOL_SIZE, threadPoolSize); + } + public QueryServicesOptions setSequenceSaltBuckets(int saltBuckets) { config.setInt(SEQUENCE_SALT_BUCKETS_ATTRIB, saltBuckets); return this; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java index 04c69bf..9492a35 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java @@ -95,15 +95,17 @@ public class GuidePostsInfo { this.guidePostsCount = guidePostsCount; this.rowCounts = Longs.toArray(rowCounts); this.byteCounts = Longs.toArray(byteCounts); - int estimatedSize = SizedUtil.OBJECT_SIZE + this.gpTimestamps = Longs.toArray(updateTimes); + // Those Java equivalents of sizeof() in C/C++, mentioned on the Web, might be overkilled here. + int estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE + guidePosts.getLength() // guidePosts + SizedUtil.INT_SIZE // maxLength + SizedUtil.INT_SIZE // guidePostsCount + SizedUtil.ARRAY_SIZE + this.rowCounts.length * SizedUtil.LONG_SIZE // rowCounts + SizedUtil.ARRAY_SIZE + this.byteCounts.length * SizedUtil.LONG_SIZE // byteCounts + + SizedUtil.ARRAY_SIZE + this.gpTimestamps.length * SizedUtil.LONG_SIZE // gpTimestamps + SizedUtil.INT_SIZE; // estimatedSize this.estimatedSize = estimatedSize; - this.gpTimestamps = Longs.toArray(updateTimes); } public ImmutableBytesWritable getGuidePosts() { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixStatsCacheLoaderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixStatsCacheLoaderTest.java new file mode 100644 index 0000000..e9c6d40 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixStatsCacheLoaderTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.query; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.Weigher; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.schema.stats.GuidePostsInfo; +import org.apache.phoenix.schema.stats.GuidePostsKey; +import org.apache.phoenix.util.ByteUtil; +import org.junit.Test; + +import java.lang.Thread; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; + +/** + * Test class around the PhoenixStatsCacheLoader. + */ +public class PhoenixStatsCacheLoaderTest { + /** + * {@link PhoenixStatsLoader} test implementation for the Stats Loader. + */ + protected class TestStatsLoaderImpl implements PhoenixStatsLoader { + private int maxLength = 1; + private final CountDownLatch firstTimeRefreshedSignal; + private final CountDownLatch secondTimeRefreshedSignal; + + public TestStatsLoaderImpl(CountDownLatch firstTimeRefreshedSignal, CountDownLatch secondTimeRefreshedSignal) { + this.firstTimeRefreshedSignal = firstTimeRefreshedSignal; + this.secondTimeRefreshedSignal = secondTimeRefreshedSignal; + } + + @Override + public boolean needsLoad() { + // Whenever it's called, we try to load stats from stats table + // no matter it has been updated or not. + return true; + } + + @Override + public GuidePostsInfo loadStats(GuidePostsKey statsKey) throws Exception { + return new GuidePostsInfo(Collections.<Long> emptyList(), + new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY), + Collections.<Long> emptyList(), maxLength++, 0, Collections.<Long> emptyList()); + } + + @Override + public GuidePostsInfo loadStats(GuidePostsKey statsKey, GuidePostsInfo prevGuidepostInfo) throws Exception { + firstTimeRefreshedSignal.countDown(); + secondTimeRefreshedSignal.countDown(); + + return new GuidePostsInfo(Collections.<Long> emptyList(), + new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY), + Collections.<Long> emptyList(), maxLength++, 0, Collections.<Long> emptyList()); + } + } + + GuidePostsInfo getStats(LoadingCache<GuidePostsKey, GuidePostsInfo> cache, GuidePostsKey guidePostsKey) { + GuidePostsInfo guidePostsInfo; + try { + guidePostsInfo = cache.get(guidePostsKey); + } catch (ExecutionException e) { + assertFalse(true); + return GuidePostsInfo.NO_GUIDEPOST; + } + + return guidePostsInfo; + } + + void sleep(int x) { + try { + Thread.sleep(x); + } + catch (InterruptedException e) { + assertFalse(true); + } + } + + @Test + public void testStatsBeingAutomaticallyRefreshed() { + ExecutorService executor = Executors.newFixedThreadPool(4); + + CountDownLatch firstTimeRefreshedSignal = new CountDownLatch(1); + CountDownLatch secondTimeRefreshedSignal = new CountDownLatch(2); + + Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); + + LoadingCache<GuidePostsKey, GuidePostsInfo> cache = CacheBuilder.newBuilder() + // Refresh entries a given amount of time after they were written + .refreshAfterWrite(100, TimeUnit.MILLISECONDS) + // Maximum total weight (size in bytes) of stats entries + .maximumWeight(QueryServicesOptions.DEFAULT_STATS_MAX_CACHE_SIZE) + // Defer actual size to the PTableStats.getEstimatedSize() + .weigher(new Weigher<GuidePostsKey, GuidePostsInfo>() { + @Override public int weigh(GuidePostsKey key, GuidePostsInfo info) { + return info.getEstimatedSize(); + } + }) + // Log removals at TRACE for debugging + .removalListener(new GuidePostsCache.PhoenixStatsCacheRemovalListener()) + // Automatically load the cache when entries are missing + .build(new PhoenixStatsCacheLoader(new TestStatsLoaderImpl( + firstTimeRefreshedSignal, secondTimeRefreshedSignal), config)); + + try { + GuidePostsKey guidePostsKey = new GuidePostsKey(new byte[4], new byte[4]); + GuidePostsInfo guidePostsInfo = getStats(cache, guidePostsKey); + assertTrue(guidePostsInfo.getMaxLength() == 1); + + // Note: With Guava cache, automatic refreshes are performed when the first stale request for an entry occurs. + + // After we sleep here for any time which is larger than the refresh cycle, the refresh of cache entry will be + // triggered for its first time by the call of getStats(). This is deterministic behavior, and it won't cause + // randomized test failures. + sleep(150); + guidePostsInfo = getStats(cache, guidePostsKey); + // Refresh has been triggered for its first time, but still could get the old value + assertTrue(guidePostsInfo.getMaxLength() >= 1); + firstTimeRefreshedSignal.await(); + + sleep(150); + guidePostsInfo = getStats(cache, guidePostsKey); + // Now the second time refresh has been triggered by the above getStats() call, the first time Refresh has completed + // and the cache entry has been updated for sure. + assertTrue(guidePostsInfo.getMaxLength() >= 2); + secondTimeRefreshedSignal.await(); + } + catch (InterruptedException e) { + assertFalse(true); + } + } +} \ No newline at end of file