phet commented on a change in pull request #3415:
URL: https://github.com/apache/gobblin/pull/3415#discussion_r731379252



##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -966,13 +965,15 @@ private void checkQuota(DagNode<JobExecutionPlan> 
dagNode) throws IOException {
 
       String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
       boolean requesterCheck = true;
-      String requesterMessage = null;
+      StringBuilder requesterMessage = new StringBuilder();
       if (serializedRequesters != null) {
         for (ServiceRequester requester : 
RequesterService.deserialize(serializedRequesters)) {
+          // We use & operator instead of && because we want to increase quota 
for all the requesters
           requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount, 
requester.getName(), dagNode);
-          if (!requesterCheck && requesterMessage == null) {
-            requesterMessage = "Quota exceeded for requester " + 
requester.getName() + " on executor " + specExecutorUri + ": quota="
-                + getQuotaForUser(requester.getName()) + ", runningJobs=" + 
requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(), 
dagNode));
+          if (!requesterCheck) {
+            requesterMessage.append("Quota exceeded for requester " + 
requester.getName() + " on executor "
+                + specExecutorUri + ": quota=" + 
getQuotaForUser(requester.getName()) + ", runningJobs="
+                + 
requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(), 
dagNode) + "\n"));

Review comment:
       seems reasonable to include a line for each `requester` exceeding quota, 
but once one exceeds, wouldn't this invariably add the line for every 
subsequent `requester`, whether exceeded or not?  (if that's a feature, not a 
bug, perhaps a comment...)

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -966,13 +965,15 @@ private void checkQuota(DagNode<JobExecutionPlan> 
dagNode) throws IOException {
 
       String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
       boolean requesterCheck = true;
-      String requesterMessage = null;
+      StringBuilder requesterMessage = new StringBuilder();
       if (serializedRequesters != null) {
         for (ServiceRequester requester : 
RequesterService.deserialize(serializedRequesters)) {
+          // We use & operator instead of && because we want to increase quota 
for all the requesters
           requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount, 
requester.getName(), dagNode);
-          if (!requesterCheck && requesterMessage == null) {
-            requesterMessage = "Quota exceeded for requester " + 
requester.getName() + " on executor " + specExecutorUri + ": quota="
-                + getQuotaForUser(requester.getName()) + ", runningJobs=" + 
requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(), 
dagNode));
+          if (!requesterCheck) {
+            requesterMessage.append("Quota exceeded for requester " + 
requester.getName() + " on executor "
+                + specExecutorUri + ": quota=" + 
getQuotaForUser(requester.getName()) + ", runningJobs="
+                + 
requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(), 
dagNode) + "\n"));

Review comment:
       this is only an error message, but it is susceptible to check-then-get 
errors.  specifically if another (interleaved) DMThread has just (in parallel) 
reduced the job count for that requester, the value shown for 
`requesterToJobCount.get` would not actually be in excess of the quota level.
   
   two options: either "break encapsulation" and directly call 
`getQuotaForUser` or change the return type of `incMapAndCheckQuota` to 
`Optional<Integer>`, so it can return the job count determined within (only if 
*to be* in excess) or else `Optional.absent()` when within quota.  (`Either` is 
actually truer to the intent of `Optional`, so we could use that if we like it.)

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -434,12 +433,12 @@ public synchronized void setActive(boolean active) {
     private static final Map<String, ContextAwareMeter> groupSuccessfulMeters 
= Maps.newConcurrentMap();
     private static final Map<String, ContextAwareMeter> groupFailureMeters = 
Maps.newConcurrentMap();
 
-    private JobStatusRetriever jobStatusRetriever;
-    private DagStateStore dagStateStore;
-    private DagStateStore failedDagStateStore;
-    private BlockingQueue<Dag<JobExecutionPlan>> queue;
-    private BlockingQueue<String> cancelQueue;
-    private BlockingQueue<String> resumeQueue;
+    private final JobStatusRetriever jobStatusRetriever;
+    private final DagStateStore dagStateStore;
+    private final DagStateStore failedDagStateStore;

Review comment:
       there's probably a reason (I didn't grok), but I couldn't figure out why 
to drop the `DagManager.failedDagStateStore`, but retain the `DagManagerThread` 
one.  please explain.

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -984,15 +985,16 @@ private void checkQuota(DagNode<JobExecutionPlan> 
dagNode) throws IOException {
       }
 
       if (!requesterCheck) {
-        throw new IOException(requesterMessage);
+        throw new IOException(requesterMessage.toString());
       }
     }
 
     /**
      * Increment quota by one for the given map and key.
+     * We need synchronization on this method because quotaMap is shared among 
all the {@link DagManagerThread}s.
      * @return true if quota is not reached for this user or user is 
whitelisted, false otherwise.
      */
-    private boolean incrementMapAndCheckQuota(Map<String, Integer> quotaMap, 
String user, DagNode<JobExecutionPlan> dagNode) {
+    synchronized private boolean incrementMapAndCheckQuota(Map<String, 
Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
       String key = DagManagerUtils.getUserQuotaKey(user, dagNode);

Review comment:
       I don't know what to make of the quota cmp (line 1007) being on the 
user's quota, even though the count is tracked by 
`DagManagerUtils.getUserQuotaKey(user, dagNode)`.  are the semantics of 
`getQuotaForUser(user)` truly "get quota per DAG node, per user" or is there a 
possible accounting mismatch?

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -984,15 +985,16 @@ private void checkQuota(DagNode<JobExecutionPlan> 
dagNode) throws IOException {
       }
 
       if (!requesterCheck) {
-        throw new IOException(requesterMessage);
+        throw new IOException(requesterMessage.toString());
       }
     }
 
     /**
      * Increment quota by one for the given map and key.
+     * We need synchronization on this method because quotaMap is shared among 
all the {@link DagManagerThread}s.
      * @return true if quota is not reached for this user or user is 
whitelisted, false otherwise.
      */
-    private boolean incrementMapAndCheckQuota(Map<String, Integer> quotaMap, 
String user, DagNode<JobExecutionPlan> dagNode) {
+    synchronized private boolean incrementMapAndCheckQuota(Map<String, 
Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {

Review comment:
       the `requesterToJobCount` (aka `quotaMap`) is already a 
`ConcurrentHashMap`.  seems reasonable synchronization granularity, sine AFAICT 
every requester's quota is separate.  synchronizing here (on the `DMThread` 
instance anyway wouldn't ensure atomicity *across* different threads.
   
   you're right on the need for concurrency control though.  to ensure atomic 
check-then-set semantics (and never drop/miss an increment), use the 
three-param version of `.replace`, rather than `.put`.  
(`AtomicInteger.compareAndSet` would afford the same, but is overkill, since 
CHM already effectively synchronizes on `key`.)  to account for the potential 
rejection to `.replace`, you'll need a loop within.

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -1063,26 +1065,28 @@ private void releaseQuota(DagNode<JobExecutionPlan> 
dagNode) {
       String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
       if (proxyUser != null) {
         String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, 
dagNode);
-        if (proxyUserToJobCount.containsKey(proxyUserKey) && 
proxyUserToJobCount.get(proxyUserKey) > 0) {
-          proxyUserToJobCount.put(proxyUserKey, 
proxyUserToJobCount.get(proxyUserKey) - 1);
-        }
+        decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
       }
 
       String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
       if (serializedRequesters != null) {
         try {
           for (ServiceRequester requester : 
RequesterService.deserialize(serializedRequesters)) {
             String requesterKey = 
DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
-            if (requesterToJobCount.containsKey(requesterKey) && 
requesterToJobCount.get(requesterKey) > 0) {
-              requesterToJobCount.put(requesterKey, 
requesterToJobCount.get(requesterKey) - 1);
-            }
+            decrementQuotaUsage(requesterToJobCount, requesterKey);
           }
         } catch (IOException e) {
           log.error("Failed to release quota for requester list " + 
serializedRequesters, e);
         }
       }
     }
 
+    synchronized private void decrementQuotaUsage(Map<String, Integer> 
quotaMap, String user) {
+      if (quotaMap.containsKey(user) && quotaMap.get(user) > 0) {
+        quotaMap.put(user, quotaMap.get(user) - 1);
+      }

Review comment:
       safer:
   ```
   Integer v;
   do {
     v = quotaMap.get(user);
   } while (v != null && v > 0 && !quotaMap.replace(user, v, v - 1));
   ```
   (since mutation possible from different DMThreads, `synchronized` wouldn't 
adequately protect.)

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -194,9 +191,9 @@ public String toString() {
   public DagManager(Config config, JobStatusRetriever jobStatusRetriever, 
boolean instrumentationEnabled) {

Review comment:
       (not under review, but since essential to quota mgmt.)
   
   for piece of mind, consider making `perUserQuota` an `ImmutableMap`

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -984,15 +985,16 @@ private void checkQuota(DagNode<JobExecutionPlan> 
dagNode) throws IOException {
       }
 
       if (!requesterCheck) {
-        throw new IOException(requesterMessage);
+        throw new IOException(requesterMessage.toString());

Review comment:
       how do we maintain the integrity of `requesterToJobCount` when this 
exception is thrown?  the incrementing has already happened, but yet, when 
`submitJob` catches, I couldn't immediately verify any compensatory "roll-back" 
on the quota accounting.

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -166,15 +164,14 @@ public String toString() {
     }
   }
 
-  private BlockingQueue<Dag<JobExecutionPlan>>[] queue;
-  private BlockingQueue<String>[] cancelQueue;
-  private BlockingQueue<String>[] resumeQueue;
+  private final BlockingQueue<Dag<JobExecutionPlan>>[] queue;

Review comment:
       may be room for improving/clarifying name
   (also potentially same for `scheduledExecutorPool`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to