Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 01467bde8 -> 3d5401ce6
PHOENIX-4165 Do not wait no new memory chunk can be allocated. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3d5401ce Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3d5401ce Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3d5401ce Branch: refs/heads/4.x-HBase-1.1 Commit: 3d5401ce60a4a2e5807d3c2d88118392622e0bb9 Parents: 01467bd Author: Lars Hofhansl <[email protected]> Authored: Fri Sep 8 21:50:25 2017 -0700 Committer: Lars Hofhansl <[email protected]> Committed: Fri Sep 8 21:50:25 2017 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/cache/GlobalCache.java | 4 +- .../phoenix/memory/GlobalMemoryManager.java | 52 ++--- .../phoenix/query/BaseQueryServicesImpl.java | 3 +- .../org/apache/phoenix/query/QueryServices.java | 1 - .../phoenix/query/QueryServicesOptions.java | 11 - .../apache/phoenix/cache/TenantCacheTest.java | 6 +- .../iterate/SpoolingResultIteratorTest.java | 2 +- .../phoenix/memory/MemoryManagerTest.java | 208 +++++-------------- .../phoenix/query/QueryServicesTestImpl.java | 1 - 9 files changed, 73 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d5401ce/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java index 0c3a87a..43df5db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java @@ -19,7 +19,6 @@ package org.apache.phoenix.cache; import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_SIZE_ATTRIB; -import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB; import java.util.Map; @@ -150,8 +149,7 @@ public class GlobalCache extends TenantCacheImpl { } private GlobalCache(Configuration config) { - super(new GlobalMemoryManager(getMaxMemorySize(config), - config.getInt(MAX_MEMORY_WAIT_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MEMORY_WAIT_MS)), + super(new GlobalMemoryManager(getMaxMemorySize(config)), config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS)); this.config = config; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d5401ce/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java index 6dbd78d..651526f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java @@ -17,7 +17,6 @@ */ package org.apache.phoenix.memory; -import com.google.common.annotations.VisibleForTesting; import org.apache.http.annotation.GuardedBy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,18 +32,14 @@ public class GlobalMemoryManager implements MemoryManager { private final Object sync = new Object(); private final long maxMemoryBytes; - private final int maxWaitMs; @GuardedBy("sync") private volatile long usedMemoryBytes; - public GlobalMemoryManager(long maxBytes, int maxWaitMs) { + public GlobalMemoryManager(long maxBytes) { if (maxBytes <= 0) { - throw new IllegalStateException("Total number of available bytes (" + maxBytes + ") must be greater than zero"); - } - if (maxWaitMs < 0) { - throw new IllegalStateException("Maximum wait time (" + maxWaitMs + ") must be greater than or equal to zero"); + throw new IllegalStateException( + "Total number of available bytes (" + maxBytes + ") must be greater than zero"); } this.maxMemoryBytes = maxBytes; - this.maxWaitMs = maxWaitMs; this.usedMemoryBytes = 0; } @@ -61,45 +56,34 @@ public class GlobalMemoryManager implements MemoryManager { } - // TODO: Work on fairness: One big memory request can cause all others to block here. + // TODO: Work on fairness: One big memory request can cause all others to fail here. private long allocateBytes(long minBytes, long reqBytes) { if (minBytes < 0 || reqBytes < 0) { - throw new IllegalStateException("Minimum requested bytes (" + minBytes + ") and requested bytes (" + reqBytes + ") must be greater than zero"); + throw new IllegalStateException("Minimum requested bytes (" + minBytes + + ") and requested bytes (" + reqBytes + ") must be greater than zero"); } - if (minBytes > maxMemoryBytes) { // No need to wait, since we'll never have this much available - throw new InsufficientMemoryException("Requested memory of " + minBytes + " bytes is larger than global pool of " + maxMemoryBytes + " bytes."); + if (minBytes > maxMemoryBytes) { + throw new InsufficientMemoryException("Requested memory of " + minBytes + + " bytes is larger than global pool of " + maxMemoryBytes + " bytes."); } - long startTimeMs = System.currentTimeMillis(); // Get time outside of sync block to account for waiting for lock long nBytes; synchronized(sync) { - while (usedMemoryBytes + minBytes > maxMemoryBytes) { // Only wait if minBytes not available - waitForBytesToFree(minBytes, startTimeMs); + if (usedMemoryBytes + minBytes > maxMemoryBytes) { + throw new InsufficientMemoryException("Requested memory of " + minBytes + + " bytes could not be allocated. Using memory of " + usedMemoryBytes + + " bytes from global pool of " + maxMemoryBytes); } // Allocate at most reqBytes, but at least minBytes nBytes = Math.min(reqBytes, maxMemoryBytes - usedMemoryBytes); if (nBytes < minBytes) { - throw new IllegalStateException("Allocated bytes (" + nBytes + ") should be at least the minimum requested bytes (" + minBytes + ")"); + throw new IllegalStateException("Allocated bytes (" + nBytes + + ") should be at least the minimum requested bytes (" + minBytes + ")"); } usedMemoryBytes += nBytes; } return nBytes; } - @VisibleForTesting - void waitForBytesToFree(long minBytes, long startTimeMs) { - try { - logger.debug("Waiting for " + (usedMemoryBytes + minBytes - maxMemoryBytes) + " bytes to be free " + startTimeMs); - long remainingWaitTimeMs = maxWaitMs - (System.currentTimeMillis() - startTimeMs); - if (remainingWaitTimeMs <= 0) { // Ran out of time waiting for some memory to get freed up - throw new InsufficientMemoryException("Requested memory of " + minBytes + " bytes could not be allocated. Using memory of " + usedMemoryBytes + " bytes from global pool of " + maxMemoryBytes + " bytes after waiting for " + maxWaitMs + "ms."); - } - sync.wait(remainingWaitTimeMs); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted allocation of " + minBytes + " bytes", ie); - } - } - @Override public MemoryChunk allocate(long minBytes, long reqBytes) { long nBytes = allocateBytes(minBytes, reqBytes); @@ -130,9 +114,7 @@ public class GlobalMemoryManager implements MemoryManager { @Override public long getSize() { - synchronized(sync) { - return size; // TODO: does this need to be synchronized? - } + return size; } @Override @@ -145,7 +127,6 @@ public class GlobalMemoryManager implements MemoryManager { if (nAdditionalBytes < 0) { usedMemoryBytes += nAdditionalBytes; size = nBytes; - sync.notifyAll(); } else { allocateBytes(nAdditionalBytes, nAdditionalBytes); size = nBytes; @@ -174,7 +155,6 @@ public class GlobalMemoryManager implements MemoryManager { synchronized(sync) { usedMemoryBytes -= size; size = 0; - sync.notifyAll(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d5401ce/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java index c16b86d..e46c262 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java @@ -47,8 +47,7 @@ public abstract class BaseQueryServicesImpl implements QueryServices { options.getQueueSize(), options.isGlobalMetricsEnabled()); this.memoryManager = new GlobalMemoryManager( - Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() / 100, - options.getMaxMemoryWaitMs()); + Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() / 100); this.props = options.getProps(defaultProps); this.queryOptimizer = new QueryOptimizer(this); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d5401ce/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 0c3b25b..a1d9761 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -74,7 +74,6 @@ public interface QueryServices extends SQLCloseable { public static final String SCAN_RESULT_CHUNK_SIZE = "phoenix.query.scanResultChunkSize"; public static final String MAX_MEMORY_PERC_ATTRIB = "phoenix.query.maxGlobalMemoryPercentage"; - public static final String MAX_MEMORY_WAIT_MS_ATTRIB = "phoenix.query.maxGlobalMemoryWaitMs"; public static final String MAX_TENANT_MEMORY_PERC_ATTRIB = "phoenix.query.maxTenantMemoryPercentage"; public static final String MAX_SERVER_CACHE_SIZE_ATTRIB = "phoenix.query.maxServerCacheBytes"; public static final String DATE_FORMAT_TIMEZONE_ATTRIB = "phoenix.query.dateFormatTimeZone"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d5401ce/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index feaf5dd..6ff096f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -48,7 +48,6 @@ import static org.apache.phoenix.query.QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB; -import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_MUTATION_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB; @@ -128,7 +127,6 @@ public class QueryServicesOptions { public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m public static final String DEFAULT_SPOOL_DIRECTORY = System.getProperty("java.io.tmpdir"); public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap - public static final int DEFAULT_MAX_MEMORY_WAIT_MS = 10000; public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100; public static final long DEFAULT_MAX_SERVER_CACHE_SIZE = 1024*1024*100; // 100 Mb public static final int DEFAULT_TARGET_QUERY_CONCURRENCY = 32; @@ -362,7 +360,6 @@ public class QueryServicesOptions { .setIfUnset(SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_SPOOL_THRESHOLD_BYTES) .setIfUnset(SPOOL_DIRECTORY, DEFAULT_SPOOL_DIRECTORY) .setIfUnset(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC) - .setIfUnset(MAX_MEMORY_WAIT_MS_ATTRIB, DEFAULT_MAX_MEMORY_WAIT_MS) .setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_TENANT_MEMORY_PERC) .setIfUnset(MAX_SERVER_CACHE_SIZE_ATTRIB, DEFAULT_MAX_SERVER_CACHE_SIZE) .setIfUnset(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE) @@ -475,10 +472,6 @@ public class QueryServicesOptions { return set(MAX_MEMORY_PERC_ATTRIB, maxMemoryPerc); } - public QueryServicesOptions setMaxMemoryWaitMs(int maxMemoryWaitMs) { - return set(MAX_MEMORY_WAIT_MS_ATTRIB, maxMemoryWaitMs); - } - public QueryServicesOptions setMaxTenantMemoryPerc(int maxTenantMemoryPerc) { return set(MAX_TENANT_MEMORY_PERC_ATTRIB, maxTenantMemoryPerc); } @@ -568,10 +561,6 @@ public class QueryServicesOptions { return config.getInt(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC); } - public int getMaxMemoryWaitMs() { - return config.getInt(MAX_MEMORY_WAIT_MS_ATTRIB, DEFAULT_MAX_MEMORY_WAIT_MS); - } - public int getMaxMutateSize() { return config.getInt(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d5401ce/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java index ade5239..932149c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java @@ -42,8 +42,7 @@ public class TenantCacheTest { public void testInvalidateClosesMemoryChunk() throws SQLException { int maxServerCacheTimeToLive = 10000; long maxBytes = 1000; - int maxWaitMs = 1000; - GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes, maxWaitMs); + GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes); TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive); ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a")); ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); @@ -57,8 +56,7 @@ public class TenantCacheTest { public void testTimeoutClosesMemoryChunk() throws Exception { int maxServerCacheTimeToLive = 10; long maxBytes = 1000; - int maxWaitMs = 10; - GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes, maxWaitMs); + GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes); ManualTicker ticker = new ManualTicker(); TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker); ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a")); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d5401ce/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java index 5ae1a56..e0a731d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java @@ -53,7 +53,7 @@ public class SpoolingResultIteratorTest { new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))), }; - MemoryManager memoryManager = new DelegatingMemoryManager(new GlobalMemoryManager(threshold, 0)); + MemoryManager memoryManager = new DelegatingMemoryManager(new GlobalMemoryManager(threshold)); ResultIterator scanner = new SpoolingResultIterator(SpoolingMetricsHolder.NO_OP_INSTANCE, MemoryMetricsHolder.NO_OP_INSTANCE, iterator, memoryManager, threshold, maxSizeSpool,"/tmp"); AssertResults.assertResults(scanner, expectedResults); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d5401ce/phoenix-core/src/test/java/org/apache/phoenix/memory/MemoryManagerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/memory/MemoryManagerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/memory/MemoryManagerTest.java index cb6498d..6da2526 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/memory/MemoryManagerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/memory/MemoryManagerTest.java @@ -19,17 +19,15 @@ package org.apache.phoenix.memory; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.spy; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; -import org.junit.Ignore; import org.junit.Test; -import org.mockito.Mockito; /** * @@ -42,7 +40,7 @@ import org.mockito.Mockito; public class MemoryManagerTest { @Test public void testOverGlobalMemoryLimit() throws Exception { - GlobalMemoryManager gmm = new GlobalMemoryManager(250,1); + GlobalMemoryManager gmm = new GlobalMemoryManager(250); try { gmm.allocate(300); fail(); @@ -64,157 +62,9 @@ public class MemoryManagerTest { assertTrue(rmm1.getAvailableMemory() == rmm1.getMaxMemory()); } - - private static void sleepFor(long time) { - try { - Thread.sleep(time); - } catch (InterruptedException x) { - fail(); - } - } - - @Ignore("See PHOENIX-2840") - @Test - public void testWaitForMemoryAvailable() throws Exception { - final GlobalMemoryManager gmm = spy(new GlobalMemoryManager(100, 80)); - final ChildMemoryManager rmm1 = new ChildMemoryManager(gmm,100); - final ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,100); - final CountDownLatch latch = new CountDownLatch(2); - Thread t1 = new Thread() { - @Override - public void run() { - MemoryChunk c1 = rmm1.allocate(50); - MemoryChunk c2 = rmm1.allocate(50); - sleepFor(40); - c1.close(); - sleepFor(20); - c2.close(); - latch.countDown(); - } - }; - Thread t2 = new Thread() { - @Override - public void run() { - sleepFor(20); - // Will require waiting for a bit of time before t1 frees the requested memory - MemoryChunk c3 = rmm2.allocate(50); - Mockito.verify(gmm, atLeastOnce()).waitForBytesToFree(anyLong(), anyLong()); - c3.close(); - latch.countDown(); - } - }; - t2.start(); - t1.start(); - latch.await(1, TimeUnit.SECONDS); - // Main thread competes with others to get all memory, but should wait - // until both threads are complete (since that's when the memory will - // again be all available. - ChildMemoryManager rmm = new ChildMemoryManager(gmm,100); - MemoryChunk c = rmm.allocate(100); - c.close(); - assertTrue(rmm.getAvailableMemory() == rmm.getMaxMemory()); - assertTrue(rmm1.getAvailableMemory() == rmm1.getMaxMemory()); - assertTrue(rmm2.getAvailableMemory() == rmm2.getMaxMemory()); - } - - @Ignore("See PHOENIX-2840") - @Test - public void testResizeWaitForMemoryAvailable() throws Exception { - final GlobalMemoryManager gmm = spy(new GlobalMemoryManager(100, 80)); - final ChildMemoryManager rmm1 = new ChildMemoryManager(gmm,100); - final ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,100); - final CountDownLatch latch = new CountDownLatch(2); - Thread t1 = new Thread() { - @Override - public void run() { - MemoryChunk c1 = rmm1.allocate(50); - MemoryChunk c2 = rmm1.allocate(40); - sleepFor(40); - c1.close(); - sleepFor(20); - c2.close(); - latch.countDown(); - } - }; - Thread t2 = new Thread() { - @Override - public void run() { - sleepFor(20); - MemoryChunk c3 = rmm2.allocate(10); - // Will require waiting for a bit of time before t1 frees the requested memory - c3.resize(50); - Mockito.verify(gmm, atLeastOnce()).waitForBytesToFree(anyLong(), anyLong()); - c3.close(); - latch.countDown(); - } - }; - t1.start(); - t2.start(); - latch.await(1, TimeUnit.SECONDS); - // Main thread competes with others to get all memory, but should wait - // until both threads are complete (since that's when the memory will - // again be all available. - ChildMemoryManager rmm = new ChildMemoryManager(gmm,100); - MemoryChunk c = rmm.allocate(100); - c.close(); - assertTrue(rmm.getAvailableMemory() == rmm.getMaxMemory()); - assertTrue(rmm1.getAvailableMemory() == rmm1.getMaxMemory()); - assertTrue(rmm2.getAvailableMemory() == rmm2.getMaxMemory()); - } - - @Ignore("See PHOENIX-2840") - @Test - public void testWaitUntilResize() throws Exception { - final GlobalMemoryManager gmm = spy(new GlobalMemoryManager(100, 80)); - final ChildMemoryManager rmm1 = new ChildMemoryManager(gmm,100); - final MemoryChunk c1 = rmm1.allocate(70); - final CountDownLatch latch = new CountDownLatch(2); - - Thread t1 = new Thread() { - @Override - public void run() { - MemoryChunk c2 = rmm1.allocate(20); - sleepFor(40); - c1.resize(20); // resize down to test that other thread is notified - sleepFor(20); - c2.close(); - c1.close(); - assertTrue(rmm1.getAvailableMemory() == rmm1.getMaxMemory()); - latch.countDown(); - } - }; - Thread t2 = new Thread() { - @Override - public void run() { - sleepFor(20); - ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,100); - MemoryChunk c3 = rmm2.allocate(10); - long startTime = System.currentTimeMillis(); - c3.resize(60); // Test that resize waits if memory not available - assertTrue(c1.getSize() == 20); // c1 was resized not closed - // we waited some time before the allocate happened - - Mockito.verify(gmm, atLeastOnce()).waitForBytesToFree(anyLong(), anyLong()); - c3.close(); - assertTrue(rmm2.getAvailableMemory() == rmm2.getMaxMemory()); - latch.countDown(); - } - }; - t1.start(); - t2.start(); - latch.await(1, TimeUnit.SECONDS); - // Main thread competes with others to get all memory, but should wait - // until both threads are complete (since that's when the memory will - // again be all available. - ChildMemoryManager rmm = new ChildMemoryManager(gmm,100); - MemoryChunk c = rmm.allocate(100); - c.close(); - assertTrue(rmm.getAvailableMemory() == rmm.getMaxMemory()); - } - @Test public void testChildDecreaseAllocation() throws Exception { - MemoryManager gmm = spy(new GlobalMemoryManager(100, 1)); + MemoryManager gmm = spy(new GlobalMemoryManager(100)); ChildMemoryManager rmm1 = new ChildMemoryManager(gmm,100); ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,10); MemoryChunk c1 = rmm1.allocate(50); @@ -229,7 +79,7 @@ public class MemoryManagerTest { @Test public void testOverChildMemoryLimit() throws Exception { - MemoryManager gmm = new GlobalMemoryManager(100,1); + MemoryManager gmm = new GlobalMemoryManager(100); ChildMemoryManager rmm1 = new ChildMemoryManager(gmm,25); ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,25); ChildMemoryManager rmm3 = new ChildMemoryManager(gmm,25); @@ -281,4 +131,50 @@ public class MemoryManagerTest { assertTrue(rmm3.getAvailableMemory() == rmm3.getMaxMemory()); assertTrue(rmm4.getAvailableMemory() == rmm4.getMaxMemory()); } + + @Test + public void testConcurrentAllocation() throws Exception { + int THREADS = 100; + + // each thread will attempt up to 100 allocations on average. + final GlobalMemoryManager gmm = new GlobalMemoryManager(THREADS * 1000); + final AtomicInteger count = new AtomicInteger(0); + final CountDownLatch barrier = new CountDownLatch(THREADS); + final CountDownLatch barrier2 = new CountDownLatch(THREADS); + final CountDownLatch signal = new CountDownLatch(1); + /* + * each thread will allocate chunks of 10 bytes, until no more memory is available. + */ + for (int i = 0; i < THREADS; i++) { + new Thread(new Runnable() { + List<MemoryChunk> chunks = new ArrayList<>(); + @Override + public void run() { + try { + while(true) { + Thread.sleep(1); + chunks.add(gmm.allocate(10)); + count.incrementAndGet(); + } + } catch (InsufficientMemoryException e) { + barrier.countDown(); + // wait for the signal to go ahead + try {signal.await();} catch (InterruptedException ix) {} + for (MemoryChunk chunk : chunks) { + chunk.close(); + } + barrier2.countDown(); + } catch (InterruptedException ix) {} + } + }).start(); + } + // wait until all threads failed an allocation + barrier.await(); + // make sure all memory was used + assertTrue(gmm.getAvailableMemory() == 0); + // let the threads end, and free their memory + signal.countDown(); barrier2.await(); + // make sure all memory is freed + assertTrue(gmm.getAvailableMemory() == gmm.getMaxMemory()); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d5401ce/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index fafd321..0b69206 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -89,7 +89,6 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setThreadTimeoutMs(DEFAULT_THREAD_TIMEOUT_MS) .setSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES) .setSpoolDirectory(DEFAULT_SPOOL_DIRECTORY) - .setMaxMemoryWaitMs(DEFAULT_MAX_MEMORY_WAIT_MS) .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC) .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE) .setMaxServerCacheTTLMs(DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS)
