>From Wail Alkowaileet <[email protected]>:

Wail Alkowaileet has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18144 )


Change subject: [WIP] Fix GlobalResourceIdFactory race cond.
......................................................................

[WIP] Fix GlobalResourceIdFactory race cond.

Change-Id: Ic7ff15abbc70277a9f1ae340314335253aa23308
---
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
1 file changed, 100 insertions(+), 51 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/44/18144/1

diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 30877d9..c06e600 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -18,9 +18,10 @@
  */
 package org.apache.asterix.runtime.transaction;

-import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;

+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
 import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
@@ -32,7 +33,6 @@

 import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
 import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
-import it.unimi.dsi.fastutil.longs.LongPriorityQueues;

 /**
  * A resource id factory that generates unique resource ids across all NCs by 
requesting
@@ -41,23 +41,25 @@
 public class GlobalResourceIdFactory implements IResourceIdFactory {

     private static final Logger LOGGER = LogManager.getLogger();
+    private static final long INVALID_ID = -1L;
+    private static final int MAX_NUMBER_OF_ATTEMPTS = 3;
     private final INCServiceContext serviceCtx;
     private final LongPriorityQueue resourceIds;
-    private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> 
resourceIdResponseQ;
     private final String nodeId;
     private final int initialBlockSize;
     private final int maxBlockSize;
+    private final AtomicInteger numberOfFailedRequests;
     private int currentBlockSize;
     private volatile boolean reset = false;

     public GlobalResourceIdFactory(INCServiceContext serviceCtx, int 
initialBlockSize) {
         this.serviceCtx = serviceCtx;
-        this.resourceIdResponseQ = new LinkedBlockingQueue<>();
         this.nodeId = serviceCtx.getNodeId();
         this.initialBlockSize = initialBlockSize;
         maxBlockSize = initialBlockSize * 2;
         currentBlockSize = initialBlockSize;
-        resourceIds = LongPriorityQueues.synchronize(new 
LongArrayFIFOQueue(initialBlockSize));
+        resourceIds = new LongArrayFIFOQueue(initialBlockSize);
+        numberOfFailedRequests = new AtomicInteger();
     }

     public synchronized void addNewIds(ResourceIdRequestResponseMessage 
resourceIdResponse)
@@ -70,52 +72,19 @@
                     currentBlockSize);
             return;
         }
-        resourceIdResponseQ.put(resourceIdResponse);
+        populateIDs(resourceIdResponse);
     }

     @Override
     public long createId() throws HyracksDataException {
-        synchronized (resourceIds) {
-            if (reset) {
-                resourceIds.clear();
-                resourceIdResponseQ.clear();
-                reset = false;
-            }
+        resetIDsIfNeeded();
+        long id = getId();
+        while (id == INVALID_ID) {
+            waitForID();
+            id = getId();
         }
-        try {
-            final long resourceId = resourceIds.dequeueLong();
-            if (resourceIds.isEmpty()) {
-                serviceCtx.getControllerService().getExecutor().submit(() -> {
-                    try {
-                        requestNewBlock();
-                    } catch (Exception e) {
-                        LOGGER.warn("failed on preemptive block request", e);
-                    }
-                });
-            }
-            return resourceId;
-        } catch (NoSuchElementException e) {
-            // fallthrough
-        }
-        try {
-            // if there already exists a response, use it
-            ResourceIdRequestResponseMessage response = 
resourceIdResponseQ.poll();
-            if (response == null) {
-                requestNewBlock();
-                response = resourceIdResponseQ.take();
-            }
-            if (response.getException() != null) {
-                throw HyracksDataException.create(response.getException());
-            }
-            // take the first id, queue the rest
-            final long startingId = response.getResourceId();
-            for (int i = 1; i < response.getBlockSize(); i++) {
-                resourceIds.enqueue(startingId + i);
-            }
-            return startingId;
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
+
+        return id;
     }

     @Override
@@ -128,9 +97,80 @@
         LOGGER.debug("current resource ids block size: {}", currentBlockSize);
     }

-    protected synchronized void requestNewBlock() throws Exception {
-        // queue is empty; request a new block
-        ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, 
currentBlockSize);
-        ((INCMessageBroker) 
serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+    private void populateIDs(ResourceIdRequestResponseMessage response) {
+        synchronized (resourceIds) {
+            long startingId = response.getResourceId();
+            for (int i = 0; i < response.getBlockSize(); i++) {
+                resourceIds.enqueue(startingId + i);
+            }
+            // Notify all waiting threads that a block of IDs was acquired
+            resourceIds.notifyAll();
+        }
+    }
+
+    private void resetIDsIfNeeded() throws HyracksDataException {
+        synchronized (resourceIds) {
+            if (reset) {
+                resourceIds.clear();
+                reset = false;
+                // First block
+                requestNewBlock();
+            }
+        }
+    }
+
+    private long getId() throws HyracksDataException {
+        long id = INVALID_ID;
+        int size;
+        synchronized (resourceIds) {
+            size = resourceIds.size();
+            if (size > 0) {
+                id = resourceIds.dequeueLong();
+            }
+        }
+
+        if (size == 1 || numberOfFailedRequests.get() > 0) {
+            // The last ID was taken. Preemptively request a new block.
+            // Or the last request failed, retry
+            requestNewBlock();
+        }
+
+        return id;
+    }
+
+    private void waitForID() throws HyracksDataException {
+        try {
+            synchronized (resourceIds) {
+                while (resourceIds.isEmpty()) {
+                    resourceIds.wait();
+                }
+            }
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void requestNewBlock() throws HyracksDataException {
+        int attempts = numberOfFailedRequests.get();
+        if (attempts >= MAX_NUMBER_OF_ATTEMPTS) {
+            // Notify all waiting threads so they can fail as well
+            resourceIds.notifyAll();
+            throw new RuntimeDataException(ErrorCode.ILLEGAL_STATE, "New block 
request was attempted (" + attempts
+                    + " times) - exceeding the maximum number of allowed 
retries. See the logs for more information.");
+        }
+
+        serviceCtx.getControllerService().getExecutor().submit(() -> {
+            try {
+                ResourceIdRequestMessage msg = new 
ResourceIdRequestMessage(nodeId, currentBlockSize);
+                ((INCMessageBroker) 
serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+                // Reset the number failures
+                numberOfFailedRequests.set(0);
+            } catch (Exception e) {
+                LOGGER.warn("failed to request a new block", e);
+                numberOfFailedRequests.incrementAndGet();
+                // Notify a waiting thread (if any) to request a new block
+                resourceIds.notify();
+            }
+        });
     }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18144
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ic7ff15abbc70277a9f1ae340314335253aa23308
Gerrit-Change-Number: 18144
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange

Reply via email to