Repository: phoenix
Updated Branches:
  refs/heads/master 2ad5d4b48 -> 44c00345b


PHOENIX-4165 Do not wait no new memory chunk can be allocated.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/44c00345
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/44c00345
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/44c00345

Branch: refs/heads/master
Commit: 44c00345b946cbdd9fc8205f8121cd60a065c0fe
Parents: 2ad5d4b
Author: Lars Hofhansl <[email protected]>
Authored: Fri Sep 8 21:54:11 2017 -0700
Committer: Lars Hofhansl <[email protected]>
Committed: Fri Sep 8 21:54:11 2017 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/cache/GlobalCache.java   |   4 +-
 .../phoenix/memory/GlobalMemoryManager.java     |  52 ++---
 .../phoenix/query/BaseQueryServicesImpl.java    |   3 +-
 .../org/apache/phoenix/query/QueryServices.java |   1 -
 .../phoenix/query/QueryServicesOptions.java     |  11 -
 .../apache/phoenix/cache/TenantCacheTest.java   |   6 +-
 .../iterate/SpoolingResultIteratorTest.java     |   2 +-
 .../phoenix/memory/MemoryManagerTest.java       | 208 +++++--------------
 .../phoenix/query/QueryServicesTestImpl.java    |   1 -
 9 files changed, 73 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/44c00345/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
index 0c3a87a..43df5db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.cache;
 
 import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_SIZE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
 
 import java.util.Map;
@@ -150,8 +149,7 @@ public class GlobalCache extends TenantCacheImpl {
     }
     
     private GlobalCache(Configuration config) {
-        super(new GlobalMemoryManager(getMaxMemorySize(config),
-                                      config.getInt(MAX_MEMORY_WAIT_MS_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_MEMORY_WAIT_MS)),
+        super(new GlobalMemoryManager(getMaxMemorySize(config)),
               
config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS));
         this.config = config;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/44c00345/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java 
b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
index 6dbd78d..651526f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.memory;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.http.annotation.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,18 +32,14 @@ public class GlobalMemoryManager implements MemoryManager {
 
     private final Object sync = new Object();
     private final long maxMemoryBytes;
-    private final int maxWaitMs;
     @GuardedBy("sync")
     private volatile long usedMemoryBytes;
-    public GlobalMemoryManager(long maxBytes, int maxWaitMs) {
+    public GlobalMemoryManager(long maxBytes) {
         if (maxBytes <= 0) {
-            throw new IllegalStateException("Total number of available bytes 
(" + maxBytes + ") must be greater than zero");
-        }
-        if (maxWaitMs < 0) {
-            throw new IllegalStateException("Maximum wait time (" + maxWaitMs 
+ ") must be greater than or equal to zero");
+            throw new IllegalStateException(
+                    "Total number of available bytes (" + maxBytes + ") must 
be greater than zero");
         }
         this.maxMemoryBytes = maxBytes;
-        this.maxWaitMs = maxWaitMs;
         this.usedMemoryBytes = 0;
     }
 
@@ -61,45 +56,34 @@ public class GlobalMemoryManager implements MemoryManager {
     }
 
 
-    // TODO: Work on fairness: One big memory request can cause all others to 
block here.
+    // TODO: Work on fairness: One big memory request can cause all others to 
fail here.
     private long allocateBytes(long minBytes, long reqBytes) {
         if (minBytes < 0 || reqBytes < 0) {
-            throw new IllegalStateException("Minimum requested bytes (" + 
minBytes + ") and requested bytes (" + reqBytes + ") must be greater than 
zero");
+            throw new IllegalStateException("Minimum requested bytes (" + 
minBytes
+                    + ") and requested bytes (" + reqBytes + ") must be 
greater than zero");
         }
-        if (minBytes > maxMemoryBytes) { // No need to wait, since we'll never 
have this much available
-            throw new InsufficientMemoryException("Requested memory of " + 
minBytes + " bytes is larger than global pool of " + maxMemoryBytes + " 
bytes.");
+        if (minBytes > maxMemoryBytes) {
+            throw new InsufficientMemoryException("Requested memory of " + 
minBytes
+                    + " bytes is larger than global pool of " + maxMemoryBytes 
+ " bytes.");
         }
-        long startTimeMs = System.currentTimeMillis(); // Get time outside of 
sync block to account for waiting for lock
         long nBytes;
         synchronized(sync) {
-            while (usedMemoryBytes + minBytes > maxMemoryBytes) { // Only wait 
if minBytes not available
-                waitForBytesToFree(minBytes, startTimeMs);
+            if (usedMemoryBytes + minBytes > maxMemoryBytes) {
+                throw new InsufficientMemoryException("Requested memory of " + 
minBytes
+                        + " bytes could not be allocated. Using memory of " + 
usedMemoryBytes
+                        + " bytes from global pool of " + maxMemoryBytes);
             }
             // Allocate at most reqBytes, but at least minBytes
             nBytes = Math.min(reqBytes, maxMemoryBytes - usedMemoryBytes);
             if (nBytes < minBytes) {
-                throw new IllegalStateException("Allocated bytes (" + nBytes + 
") should be at least the minimum requested bytes (" + minBytes + ")");
+                throw new IllegalStateException("Allocated bytes (" + nBytes
+                        + ") should be at least the minimum requested bytes (" 
+ minBytes + ")");
             }
             usedMemoryBytes += nBytes;
         }
         return nBytes;
     }
 
-    @VisibleForTesting
-    void waitForBytesToFree(long minBytes, long startTimeMs) {
-        try {
-            logger.debug("Waiting for " + (usedMemoryBytes + minBytes - 
maxMemoryBytes) + " bytes to be free " + startTimeMs);
-            long remainingWaitTimeMs = maxWaitMs - (System.currentTimeMillis() 
- startTimeMs);
-            if (remainingWaitTimeMs <= 0) { // Ran out of time waiting for 
some memory to get freed up
-                throw new InsufficientMemoryException("Requested memory of " + 
minBytes + " bytes could not be allocated. Using memory of " + usedMemoryBytes 
+ " bytes from global pool of " + maxMemoryBytes + " bytes after waiting for " 
+ maxWaitMs + "ms.");
-            }
-            sync.wait(remainingWaitTimeMs);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException("Interrupted allocation of " + minBytes 
+ " bytes", ie);
-        }
-    }
-
     @Override
     public MemoryChunk allocate(long minBytes, long reqBytes) {
         long nBytes = allocateBytes(minBytes, reqBytes);
@@ -130,9 +114,7 @@ public class GlobalMemoryManager implements MemoryManager {
 
         @Override
         public long getSize() {
-            synchronized(sync) {
-                return size; // TODO: does this need to be synchronized?
-            }
+            return size;
         }
 
         @Override
@@ -145,7 +127,6 @@ public class GlobalMemoryManager implements MemoryManager {
                 if (nAdditionalBytes < 0) {
                     usedMemoryBytes += nAdditionalBytes;
                     size = nBytes;
-                    sync.notifyAll();
                 } else {
                     allocateBytes(nAdditionalBytes, nAdditionalBytes);
                     size = nBytes;
@@ -174,7 +155,6 @@ public class GlobalMemoryManager implements MemoryManager {
             synchronized(sync) {
                 usedMemoryBytes -= size;
                 size = 0;
-                sync.notifyAll();
             }
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/44c00345/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
index c16b86d..e46c262 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -47,8 +47,7 @@ public abstract class BaseQueryServicesImpl implements 
QueryServices {
                 options.getQueueSize(),
                 options.isGlobalMetricsEnabled());
         this.memoryManager = new GlobalMemoryManager(
-                Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() 
/ 100,
-                options.getMaxMemoryWaitMs());
+                Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() 
/ 100);
         this.props = options.getProps(defaultProps);
         this.queryOptimizer = new QueryOptimizer(this);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/44c00345/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 0c3b25b..a1d9761 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -74,7 +74,6 @@ public interface QueryServices extends SQLCloseable {
     public static final String SCAN_RESULT_CHUNK_SIZE = 
"phoenix.query.scanResultChunkSize";
 
     public static final String MAX_MEMORY_PERC_ATTRIB = 
"phoenix.query.maxGlobalMemoryPercentage";
-    public static final String MAX_MEMORY_WAIT_MS_ATTRIB = 
"phoenix.query.maxGlobalMemoryWaitMs";
     public static final String MAX_TENANT_MEMORY_PERC_ATTRIB = 
"phoenix.query.maxTenantMemoryPercentage";
     public static final String MAX_SERVER_CACHE_SIZE_ATTRIB = 
"phoenix.query.maxServerCacheBytes";
     public static final String DATE_FORMAT_TIMEZONE_ATTRIB = 
"phoenix.query.dateFormatTimeZone";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/44c00345/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index feaf5dd..6ff096f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -48,7 +48,6 @@ import static 
org.apache.phoenix.query.QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_
 import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_MUTATION_SIZE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB;
@@ -128,7 +127,6 @@ public class QueryServicesOptions {
        public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 
20; // 20m
     public static final String DEFAULT_SPOOL_DIRECTORY = 
System.getProperty("java.io.tmpdir");
        public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap
-       public static final int DEFAULT_MAX_MEMORY_WAIT_MS = 10000;
        public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100;
        public static final long DEFAULT_MAX_SERVER_CACHE_SIZE = 1024*1024*100; 
 // 100 Mb
     public static final int DEFAULT_TARGET_QUERY_CONCURRENCY = 32;
@@ -362,7 +360,6 @@ public class QueryServicesOptions {
             .setIfUnset(SPOOL_THRESHOLD_BYTES_ATTRIB, 
DEFAULT_SPOOL_THRESHOLD_BYTES)
             .setIfUnset(SPOOL_DIRECTORY, DEFAULT_SPOOL_DIRECTORY)
             .setIfUnset(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC)
-            .setIfUnset(MAX_MEMORY_WAIT_MS_ATTRIB, DEFAULT_MAX_MEMORY_WAIT_MS)
             .setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, 
DEFAULT_MAX_TENANT_MEMORY_PERC)
             .setIfUnset(MAX_SERVER_CACHE_SIZE_ATTRIB, 
DEFAULT_MAX_SERVER_CACHE_SIZE)
             .setIfUnset(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE)
@@ -475,10 +472,6 @@ public class QueryServicesOptions {
         return set(MAX_MEMORY_PERC_ATTRIB, maxMemoryPerc);
     }
 
-    public QueryServicesOptions setMaxMemoryWaitMs(int maxMemoryWaitMs) {
-        return set(MAX_MEMORY_WAIT_MS_ATTRIB, maxMemoryWaitMs);
-    }
-
     public QueryServicesOptions setMaxTenantMemoryPerc(int 
maxTenantMemoryPerc) {
         return set(MAX_TENANT_MEMORY_PERC_ATTRIB, maxTenantMemoryPerc);
     }
@@ -568,10 +561,6 @@ public class QueryServicesOptions {
         return config.getInt(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC);
     }
 
-    public int getMaxMemoryWaitMs() {
-        return config.getInt(MAX_MEMORY_WAIT_MS_ATTRIB, 
DEFAULT_MAX_MEMORY_WAIT_MS);
-    }
-
     public int getMaxMutateSize() {
         return config.getInt(MAX_MUTATION_SIZE_ATTRIB, 
DEFAULT_MAX_MUTATION_SIZE);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/44c00345/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
index ade5239..932149c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
@@ -42,8 +42,7 @@ public class TenantCacheTest {
     public void testInvalidateClosesMemoryChunk() throws SQLException {
         int maxServerCacheTimeToLive = 10000;
         long maxBytes = 1000;
-        int maxWaitMs = 1000;
-        GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes, 
maxWaitMs);
+        GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
         TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, 
maxServerCacheTimeToLive);
         ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a"));
         ImmutableBytesWritable cachePtr = new 
ImmutableBytesWritable(Bytes.toBytes("a"));
@@ -57,8 +56,7 @@ public class TenantCacheTest {
     public void testTimeoutClosesMemoryChunk() throws Exception {
         int maxServerCacheTimeToLive = 10;
         long maxBytes = 1000;
-        int maxWaitMs = 10;
-        GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes, 
maxWaitMs);
+        GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes);
         ManualTicker ticker = new ManualTicker();
         TenantCacheImpl cache = new TenantCacheImpl(memoryManager, 
maxServerCacheTimeToLive, ticker);
         ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/44c00345/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
index 5ae1a56..e0a731d 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
@@ -53,7 +53,7 @@ public class SpoolingResultIteratorTest {
                 new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1))),
             };
 
-        MemoryManager memoryManager = new DelegatingMemoryManager(new 
GlobalMemoryManager(threshold, 0));
+        MemoryManager memoryManager = new DelegatingMemoryManager(new 
GlobalMemoryManager(threshold));
         ResultIterator scanner = new 
SpoolingResultIterator(SpoolingMetricsHolder.NO_OP_INSTANCE, 
MemoryMetricsHolder.NO_OP_INSTANCE, iterator, memoryManager, threshold, 
maxSizeSpool,"/tmp");
         AssertResults.assertResults(scanner, expectedResults);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/44c00345/phoenix-core/src/test/java/org/apache/phoenix/memory/MemoryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/memory/MemoryManagerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/memory/MemoryManagerTest.java
index cb6498d..6da2526 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/memory/MemoryManagerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/memory/MemoryManagerTest.java
@@ -19,17 +19,15 @@ package org.apache.phoenix.memory;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.spy;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
-import org.junit.Ignore;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 /**
  *
@@ -42,7 +40,7 @@ import org.mockito.Mockito;
 public class MemoryManagerTest {
     @Test
     public void testOverGlobalMemoryLimit() throws Exception {
-        GlobalMemoryManager gmm = new GlobalMemoryManager(250,1);
+        GlobalMemoryManager gmm = new GlobalMemoryManager(250);
         try {
             gmm.allocate(300);
             fail();
@@ -64,157 +62,9 @@ public class MemoryManagerTest {
         assertTrue(rmm1.getAvailableMemory() == rmm1.getMaxMemory());
     }
 
-
-    private static void sleepFor(long time) {
-        try {
-            Thread.sleep(time);
-        } catch (InterruptedException x) {
-            fail();
-        }
-    }
-
-    @Ignore("See PHOENIX-2840")
-    @Test
-    public void testWaitForMemoryAvailable() throws Exception {
-        final GlobalMemoryManager gmm = spy(new GlobalMemoryManager(100, 80));
-        final ChildMemoryManager rmm1 = new ChildMemoryManager(gmm,100);
-        final ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,100);
-        final CountDownLatch latch = new CountDownLatch(2);
-        Thread t1 = new Thread() {
-            @Override
-            public void run() {
-                MemoryChunk c1 = rmm1.allocate(50);
-                MemoryChunk c2 = rmm1.allocate(50);
-                sleepFor(40);
-                c1.close();
-                sleepFor(20);
-                c2.close();
-                latch.countDown();
-            }
-        };
-        Thread t2 = new Thread() {
-            @Override
-            public void run() {
-                sleepFor(20);
-                // Will require waiting for a bit of time before t1 frees the 
requested memory
-                MemoryChunk c3 = rmm2.allocate(50);
-                Mockito.verify(gmm, 
atLeastOnce()).waitForBytesToFree(anyLong(), anyLong());
-                c3.close();
-                latch.countDown();
-            }
-        };
-        t2.start();
-        t1.start();
-        latch.await(1, TimeUnit.SECONDS);
-        // Main thread competes with others to get all memory, but should wait
-        // until both threads are complete (since that's when the memory will
-        // again be all available.
-        ChildMemoryManager rmm = new ChildMemoryManager(gmm,100);
-        MemoryChunk c = rmm.allocate(100);
-        c.close();
-        assertTrue(rmm.getAvailableMemory() == rmm.getMaxMemory());
-        assertTrue(rmm1.getAvailableMemory() == rmm1.getMaxMemory());
-        assertTrue(rmm2.getAvailableMemory() == rmm2.getMaxMemory());
-    }
-
-    @Ignore("See PHOENIX-2840")
-    @Test
-    public void testResizeWaitForMemoryAvailable() throws Exception {
-        final GlobalMemoryManager gmm = spy(new GlobalMemoryManager(100, 80));
-        final ChildMemoryManager rmm1 = new ChildMemoryManager(gmm,100);
-        final ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,100);
-        final CountDownLatch latch = new CountDownLatch(2);
-        Thread t1 = new Thread() {
-            @Override
-            public void run() {
-                MemoryChunk c1 = rmm1.allocate(50);
-                MemoryChunk c2 = rmm1.allocate(40);
-                sleepFor(40);
-                c1.close();
-                sleepFor(20);
-                c2.close();
-                latch.countDown();
-            }
-        };
-        Thread t2 = new Thread() {
-            @Override
-            public void run() {
-                sleepFor(20);
-                MemoryChunk c3 = rmm2.allocate(10);
-                // Will require waiting for a bit of time before t1 frees the 
requested memory
-                c3.resize(50);
-                Mockito.verify(gmm, 
atLeastOnce()).waitForBytesToFree(anyLong(), anyLong());
-                c3.close();
-                latch.countDown();
-            }
-        };
-        t1.start();
-        t2.start();
-        latch.await(1, TimeUnit.SECONDS);
-        // Main thread competes with others to get all memory, but should wait
-        // until both threads are complete (since that's when the memory will
-        // again be all available.
-        ChildMemoryManager rmm = new ChildMemoryManager(gmm,100);
-        MemoryChunk c = rmm.allocate(100);
-        c.close();
-        assertTrue(rmm.getAvailableMemory() == rmm.getMaxMemory());
-        assertTrue(rmm1.getAvailableMemory() == rmm1.getMaxMemory());
-        assertTrue(rmm2.getAvailableMemory() == rmm2.getMaxMemory());
-    }
-
-    @Ignore("See PHOENIX-2840")
-    @Test
-    public void testWaitUntilResize() throws Exception {
-        final GlobalMemoryManager gmm = spy(new GlobalMemoryManager(100, 80));
-        final ChildMemoryManager rmm1 = new ChildMemoryManager(gmm,100);
-        final MemoryChunk c1 = rmm1.allocate(70);
-        final CountDownLatch latch = new CountDownLatch(2);
-
-        Thread t1 = new Thread() {
-            @Override
-            public void run() {
-                MemoryChunk c2 = rmm1.allocate(20);
-                sleepFor(40);
-                c1.resize(20); // resize down to test that other thread is 
notified
-                sleepFor(20);
-                c2.close();
-                c1.close();
-                assertTrue(rmm1.getAvailableMemory() == rmm1.getMaxMemory());
-                latch.countDown();
-            }
-        };
-        Thread t2 = new Thread() {
-            @Override
-            public void run() {
-                sleepFor(20);
-                ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,100);
-                MemoryChunk c3 = rmm2.allocate(10);
-                long startTime = System.currentTimeMillis();
-                c3.resize(60); // Test that resize waits if memory not 
available
-                assertTrue(c1.getSize() == 20); // c1 was resized not closed
-                // we waited some time before the allocate happened
-
-                Mockito.verify(gmm, 
atLeastOnce()).waitForBytesToFree(anyLong(), anyLong());
-                c3.close();
-                assertTrue(rmm2.getAvailableMemory() == rmm2.getMaxMemory());
-                latch.countDown();
-            }
-        };
-        t1.start();
-        t2.start();
-        latch.await(1, TimeUnit.SECONDS);
-        // Main thread competes with others to get all memory, but should wait
-        // until both threads are complete (since that's when the memory will
-        // again be all available.
-        ChildMemoryManager rmm = new ChildMemoryManager(gmm,100);
-        MemoryChunk c = rmm.allocate(100);
-        c.close();
-        assertTrue(rmm.getAvailableMemory() == rmm.getMaxMemory());
-    }
-
     @Test
     public void testChildDecreaseAllocation() throws Exception {
-        MemoryManager gmm = spy(new GlobalMemoryManager(100, 1));
+        MemoryManager gmm = spy(new GlobalMemoryManager(100));
         ChildMemoryManager rmm1 = new ChildMemoryManager(gmm,100);
         ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,10);
         MemoryChunk c1 = rmm1.allocate(50);
@@ -229,7 +79,7 @@ public class MemoryManagerTest {
 
     @Test
     public void testOverChildMemoryLimit() throws Exception {
-        MemoryManager gmm = new GlobalMemoryManager(100,1);
+        MemoryManager gmm = new GlobalMemoryManager(100);
         ChildMemoryManager rmm1 = new ChildMemoryManager(gmm,25);
         ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,25);
         ChildMemoryManager rmm3 = new ChildMemoryManager(gmm,25);
@@ -281,4 +131,50 @@ public class MemoryManagerTest {
         assertTrue(rmm3.getAvailableMemory() == rmm3.getMaxMemory());
         assertTrue(rmm4.getAvailableMemory() == rmm4.getMaxMemory());
     }
+
+    @Test
+    public void testConcurrentAllocation() throws Exception {
+        int THREADS = 100;
+
+        // each thread will attempt up to 100 allocations on average.
+        final GlobalMemoryManager gmm = new GlobalMemoryManager(THREADS * 
1000);
+        final AtomicInteger count = new AtomicInteger(0);
+        final CountDownLatch barrier = new CountDownLatch(THREADS);
+        final CountDownLatch barrier2 = new CountDownLatch(THREADS);
+        final CountDownLatch signal = new CountDownLatch(1);
+        /*
+         * each thread will allocate chunks of 10 bytes, until no more memory 
is available.
+         */
+        for (int i = 0; i < THREADS; i++) {
+            new Thread(new Runnable() {
+                List<MemoryChunk> chunks = new ArrayList<>();
+                @Override
+                public void run() {
+                    try {
+                        while(true) {
+                            Thread.sleep(1);
+                            chunks.add(gmm.allocate(10));
+                            count.incrementAndGet();
+                        }
+                    } catch (InsufficientMemoryException e) {
+                        barrier.countDown();
+                        // wait for the signal to go ahead
+                        try {signal.await();} catch (InterruptedException ix) 
{}
+                        for (MemoryChunk chunk : chunks) {
+                            chunk.close();
+                        }
+                        barrier2.countDown();
+                    } catch (InterruptedException ix) {}
+                }
+            }).start();
+        }
+        // wait until all threads failed an allocation
+        barrier.await();
+        // make sure all memory was used
+        assertTrue(gmm.getAvailableMemory() == 0);
+        // let the threads end, and free their memory
+        signal.countDown(); barrier2.await();
+        // make sure all memory is freed
+        assertTrue(gmm.getAvailableMemory() == gmm.getMaxMemory());
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/44c00345/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index fafd321..0b69206 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -89,7 +89,6 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
                 .setThreadTimeoutMs(DEFAULT_THREAD_TIMEOUT_MS)
                 .setSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES)
                 .setSpoolDirectory(DEFAULT_SPOOL_DIRECTORY)
-                .setMaxMemoryWaitMs(DEFAULT_MAX_MEMORY_WAIT_MS)
                 .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC)
                 .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE)
                 
.setMaxServerCacheTTLMs(DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS)

Reply via email to