umustafi commented on code in PR #3516:
URL: https://github.com/apache/gobblin/pull/3516#discussion_r888520617


##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java:
##########
@@ -283,14 +301,61 @@ public boolean apply(Void input) {
     
Assert.assertEquals(schedulerService.getScheduler().getJobGroupNames().size(), 
0);
   }
 
+  @Test
+  public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {
+    File specDir = Files.createTempDir();
+
+    Properties properties = new Properties();
+    properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+    FlowCatalog flowCatalog = new 
FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+    ServiceBasedAppLauncher serviceLauncher = new 
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+
+
+    serviceLauncher.addService(flowCatalog);
+    serviceLauncher.start();
+
+    FlowSpec flowSpec0 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"), 
"flowName0", "group1",
+        
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, 
ConfigValueFactory.fromAnyRef("true")));
+    FlowSpec flowSpec1 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"), 
"flowName1", "group1",
+        
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, 
ConfigValueFactory.fromAnyRef("true")));
+
+    Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+    SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class);
+    when(mockOrchestrator.getSpecCompiler()).thenReturn(mockSpecCompiler);
+    Dag<JobExecutionPlan> mockDag0 = this.buildDag(flowSpec0.getConfig(), "0");
+    Dag<JobExecutionPlan> mockDag1 = this.buildDag(flowSpec1.getConfig(), "1");
+    when(mockSpecCompiler.compileFlow(flowSpec0)).thenReturn(mockDag0);
+    when(mockSpecCompiler.compileFlow(flowSpec1)).thenReturn(mockDag1);
+
+    SchedulerService schedulerService = new SchedulerService(new Properties());
+    // Mock a GaaS scheduler.
+    GobblinServiceJobScheduler scheduler = new 
GobblinServiceJobScheduler("testscheduler",
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService, Optional.of(new 
UserQuotaManager(quotaConfig)), Optional.absent());
+
+    schedulerService.startAsync().awaitRunning();
+    scheduler.startUp();
+    scheduler.setActive(true);
+
+    scheduler.onAddSpec(flowSpec0); //Ignore the response for this request
+    AddSpecResponse<String> response1 = scheduler.onAddSpec(flowSpec1);
+    Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
+
+    Assert.assertEquals(response1.getValue(), 
"org.apache.gobblin.exception.QuotaExceededException: Quota exceeded for 
flowgroup group1 on executor jobExecutor : quota=1, requests above quota=1\n");
+    // Second flow should not be added to scheduled flows since it was rejected
+    Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
+    // set scheduler to be inactive and unschedule flows
+    scheduler.setActive(false);
+    Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0);

Review Comment:
   is this part just for cleanup of the scheduler class used for testing? 



##########
gobblin-utility/src/main/java/org/apache/gobblin/exception/QuotaExceededException.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.exception;
+
+import java.io.IOException;
+
+
+public class QuotaExceededException extends IOException {
+
+  public QuotaExceededException(String message) {
+    super(message);

Review Comment:
   where does this message above `Quota exceeded for flowgroup group1 on 
executor jobExecutor...` originate from? Is the format of the exception defined 
in another rb? Does it make sense to have the constructor for this exception 
contain some of that `Quota exceeded for...` and parameterize the rest? 



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -360,16 +361,21 @@ public Map<String, AddSpecResponse> put(Spec spec, 
boolean triggerListener) {
         responseMap.put(entry.getKey().getName(), 
entry.getValue().getResult());
       }
     }
+    AddSpecResponse<String> schedulerResponse = 
responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS,
 new AddSpecResponse<>(null));
 
-    if (isCompileSuccessful(responseMap)) {
+    if (isCompileSuccessful(schedulerResponse.getValue())) {
       synchronized (syncObject) {
         try {
-          if (!flowSpec.isExplain()) {
-            long startTime = System.currentTimeMillis();
-            specStore.addSpec(spec);
-            metrics.updatePutSpecTime(startTime);
+          if 
(schedulerResponse.getValue().contains(QuotaExceededException.class.getSimpleName()))
 {
+            responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>(schedulerResponse.getValue()));

Review Comment:
   why do we only add the response value if the exception is there? I see 
otherwise the value is "true". Is there a "false" case where we don't put the 
exception reason?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to