>From Wail Alkowaileet <[email protected]>:
Wail Alkowaileet has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18144 )
Change subject: [WIP] Fix GlobalResourceIdFactory race cond.
......................................................................
[WIP] Fix GlobalResourceIdFactory race cond.
Change-Id: Ic7ff15abbc70277a9f1ae340314335253aa23308
---
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
1 file changed, 100 insertions(+), 51 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/44/18144/1
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 30877d9..c06e600 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -18,9 +18,10 @@
*/
package org.apache.asterix.runtime.transaction;
-import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
@@ -32,7 +33,6 @@
import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
-import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
/**
* A resource id factory that generates unique resource ids across all NCs by
requesting
@@ -41,23 +41,25 @@
public class GlobalResourceIdFactory implements IResourceIdFactory {
private static final Logger LOGGER = LogManager.getLogger();
+ private static final long INVALID_ID = -1L;
+ private static final int MAX_NUMBER_OF_ATTEMPTS = 3;
private final INCServiceContext serviceCtx;
private final LongPriorityQueue resourceIds;
- private final LinkedBlockingQueue<ResourceIdRequestResponseMessage>
resourceIdResponseQ;
private final String nodeId;
private final int initialBlockSize;
private final int maxBlockSize;
+ private final AtomicInteger numberOfFailedRequests;
private int currentBlockSize;
private volatile boolean reset = false;
public GlobalResourceIdFactory(INCServiceContext serviceCtx, int
initialBlockSize) {
this.serviceCtx = serviceCtx;
- this.resourceIdResponseQ = new LinkedBlockingQueue<>();
this.nodeId = serviceCtx.getNodeId();
this.initialBlockSize = initialBlockSize;
maxBlockSize = initialBlockSize * 2;
currentBlockSize = initialBlockSize;
- resourceIds = LongPriorityQueues.synchronize(new
LongArrayFIFOQueue(initialBlockSize));
+ resourceIds = new LongArrayFIFOQueue(initialBlockSize);
+ numberOfFailedRequests = new AtomicInteger();
}
public synchronized void addNewIds(ResourceIdRequestResponseMessage
resourceIdResponse)
@@ -70,52 +72,19 @@
currentBlockSize);
return;
}
- resourceIdResponseQ.put(resourceIdResponse);
+ populateIDs(resourceIdResponse);
}
@Override
public long createId() throws HyracksDataException {
- synchronized (resourceIds) {
- if (reset) {
- resourceIds.clear();
- resourceIdResponseQ.clear();
- reset = false;
- }
+ resetIDsIfNeeded();
+ long id = getId();
+ while (id == INVALID_ID) {
+ waitForID();
+ id = getId();
}
- try {
- final long resourceId = resourceIds.dequeueLong();
- if (resourceIds.isEmpty()) {
- serviceCtx.getControllerService().getExecutor().submit(() -> {
- try {
- requestNewBlock();
- } catch (Exception e) {
- LOGGER.warn("failed on preemptive block request", e);
- }
- });
- }
- return resourceId;
- } catch (NoSuchElementException e) {
- // fallthrough
- }
- try {
- // if there already exists a response, use it
- ResourceIdRequestResponseMessage response =
resourceIdResponseQ.poll();
- if (response == null) {
- requestNewBlock();
- response = resourceIdResponseQ.take();
- }
- if (response.getException() != null) {
- throw HyracksDataException.create(response.getException());
- }
- // take the first id, queue the rest
- final long startingId = response.getResourceId();
- for (int i = 1; i < response.getBlockSize(); i++) {
- resourceIds.enqueue(startingId + i);
- }
- return startingId;
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
+
+ return id;
}
@Override
@@ -128,9 +97,80 @@
LOGGER.debug("current resource ids block size: {}", currentBlockSize);
}
- protected synchronized void requestNewBlock() throws Exception {
- // queue is empty; request a new block
- ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId,
currentBlockSize);
- ((INCMessageBroker)
serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+ private void populateIDs(ResourceIdRequestResponseMessage response) {
+ synchronized (resourceIds) {
+ long startingId = response.getResourceId();
+ for (int i = 0; i < response.getBlockSize(); i++) {
+ resourceIds.enqueue(startingId + i);
+ }
+ // Notify all waiting threads that a block of IDs was acquired
+ resourceIds.notifyAll();
+ }
+ }
+
+ private void resetIDsIfNeeded() throws HyracksDataException {
+ synchronized (resourceIds) {
+ if (reset) {
+ resourceIds.clear();
+ reset = false;
+ // First block
+ requestNewBlock();
+ }
+ }
+ }
+
+ private long getId() throws HyracksDataException {
+ long id = INVALID_ID;
+ int size;
+ synchronized (resourceIds) {
+ size = resourceIds.size();
+ if (size > 0) {
+ id = resourceIds.dequeueLong();
+ }
+ }
+
+ if (size == 1 || numberOfFailedRequests.get() > 0) {
+ // The last ID was taken. Preemptively request a new block.
+ // Or the last request failed, retry
+ requestNewBlock();
+ }
+
+ return id;
+ }
+
+ private void waitForID() throws HyracksDataException {
+ try {
+ synchronized (resourceIds) {
+ while (resourceIds.isEmpty()) {
+ resourceIds.wait();
+ }
+ }
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void requestNewBlock() throws HyracksDataException {
+ int attempts = numberOfFailedRequests.get();
+ if (attempts >= MAX_NUMBER_OF_ATTEMPTS) {
+ // Notify all waiting threads so they can fail as well
+ resourceIds.notifyAll();
+ throw new RuntimeDataException(ErrorCode.ILLEGAL_STATE, "New block
request was attempted (" + attempts
+ + " times) - exceeding the maximum number of allowed
retries. See the logs for more information.");
+ }
+
+ serviceCtx.getControllerService().getExecutor().submit(() -> {
+ try {
+ ResourceIdRequestMessage msg = new
ResourceIdRequestMessage(nodeId, currentBlockSize);
+ ((INCMessageBroker)
serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+ // Reset the number failures
+ numberOfFailedRequests.set(0);
+ } catch (Exception e) {
+ LOGGER.warn("failed to request a new block", e);
+ numberOfFailedRequests.incrementAndGet();
+ // Notify a waiting thread (if any) to request a new block
+ resourceIds.notify();
+ }
+ });
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18144
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ic7ff15abbc70277a9f1ae340314335253aa23308
Gerrit-Change-Number: 18144
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange