[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=908146&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-908146
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Mar/24 19:27
            Start Date: 04/Mar/24 19:27
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3858:
URL: https://github.com/apache/gobblin/pull/3858#discussion_r1511687787


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitor.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+
+
+/**
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} 
schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests 
to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class DagProcEngineEnabledDagActionStoreChangeMonitor extends 
DagActionStoreChangeMonitor {
+  private final DagManagement dagManagement;
+
+  // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
+  // client itself to determine all Kafka related information dynamically 
rather than through the config.
+  public DagProcEngineEnabledDagActionStoreChangeMonitor(String topic, Config 
config, DagManager dagManager, int numThreads,
+      FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore 
dagActionStore,
+      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+    // Differentiate group id for each host
+    super(topic, config, dagManager, numThreads, flowCatalog, orchestrator, 
dagActionStore, isMultiActiveSchedulerEnabled);
+    this.dagManagement = dagManagement;
+  }
+
+  /**
+   * This implementation passes on the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} to the
+   * {@link DagManagement} instead of finding a {@link 
org.apache.gobblin.runtime.api.FlowSpec} passing the spec to {@link 
Orchestrator}.
+   */
+  @Override
+  protected void handleDagAction(DagActionStore.DagAction dagAction, boolean 
isStartup) {
+    log.info("(" + (isStartup ? "on-startup" : "post-startup") + ") DagAction 
change ({}) received for flow: {}",
+        dagAction.getFlowActionType(), dagAction);
+    LaunchSubmissionMetricProxy launchSubmissionMetricProxy = isStartup ? 
ON_STARTUP : POST_STARTUP;
+    try {
+      // todo - add actions for other other type of dag actions
+      if 
(dagAction.getFlowActionType().equals(DagActionStore.FlowActionType.LAUNCH)) {
+        // If multi-active scheduler is NOT turned on we should not receive 
these type of events
+        if (!this.isMultiActiveSchedulerEnabled) {
+          this.unexpectedErrors.mark();
+          throw new RuntimeException(String.format("Received LAUNCH dagAction 
while not in multi-active scheduler "
+              + "mode for flowAction: %s", dagAction));
+        }
+        dagManagement.addDagAction(dagAction);
+      } else {
+        log.warn("Received unsupported dagAction {}. Expected to be a KILL, 
RESUME, or LAUNCH", dagAction.getFlowActionType());

Review Comment:
   if we enable this monitor through config it can only be done after 
implementing them all right now. Is that the intention or we want to pass other 
actions through old model?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 908146)
    Time Spent: 26h 50m  (was: 26h 40m)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 26h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to