[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=909017&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-909017
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Mar/24 21:54
            Start Date: 08/Mar/24 21:54
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3858:
URL: https://github.com/apache/gobblin/pull/3858#discussion_r1518342507


##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+import static org.mockito.Mockito.spy;
+
+@Slf4j
+public class DagProcessingEngineTest {
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+  DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+  DagManagementTaskStreamImpl dagManagementTaskStream;
+  DagProcessingEngine dagProcessingEngine;
+  DagTaskStream dagTaskStream;
+  DagProcFactory dagProcFactory;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    this.dagManagementTaskStream =
+        new DagManagementTaskStreamImpl(config, Optional.empty(), 
dagManagementStateStore);
+    this.dagManagementTaskStream.setActive(true);
+    this.dagProcFactory = new DagProcFactory();
+    this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+        this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+    this.dagTaskStream = spy(new MockedDagTaskStream());
+    this.dagProcessingEngine =
+        new DagProcessingEngine(config, dagTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+  }
+
+  static class MockedDagTaskStream implements DagTaskStream {
+    public static final int MAX_NUM_OF_TASKS = 10;
+    volatile int i=0;
+
+    @Override
+    public boolean hasNext() {
+      return true;
+    }
+
+    @Override
+    public synchronized DagTask next() {
+      i++;
+      if (i <= MAX_NUM_OF_TASKS) {
+        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, "1234" + i, DagActionStore.FlowActionType.LAUNCH), null);
+      } else {
+        throw new RuntimeException("Max num of tasks reached");
+      }
+    }
+  }
+
+  static class MockedDagTask extends DagTask<MockedDagProc> {
+
+    public MockedDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+      super(dagAction, leaseObtainedStatus);
+    }
+
+    @Override
+    public MockedDagProc host(DagTaskVisitor<MockedDagProc> visitor) {
+      return new MockedDagProc();
+    }
+
+    @Override
+    public boolean conclude() throws IOException {
+      return false;
+    }
+  }
+
+  static class MockedDagProc extends DagProc<Void, Void> {
+
+    @Override
+    protected Void initialize(DagManagementStateStore dagManagementStateStore) 
{
+      return null;
+    }
+
+    @Override
+    protected Void act(DagManagementStateStore dagManagementStateStore, Void 
state) {
+      return null;
+    }
+
+    @Override
+    protected void sendNotification(Void result, EventSubmitter 
eventSubmitter) {
+    }
+
+    @Override
+    protected void commit(DagManagementStateStore dagManagementStateStore, 
Void result) {
+    }
+  }
+
+  // This tests verifies that
+  @Test
+  public void dagProcessingTest() throws InterruptedException, 
TimeoutException {
+    int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS + 
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;

Review Comment:
   actually don't we expect 2x that number of invocations, given that we have 
both hasNext() and next()?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+import static org.mockito.Mockito.spy;
+
+@Slf4j
+public class DagProcessingEngineTest {
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+  DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+  DagManagementTaskStreamImpl dagManagementTaskStream;
+  DagProcessingEngine dagProcessingEngine;
+  DagTaskStream dagTaskStream;
+  DagProcFactory dagProcFactory;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    this.dagManagementTaskStream =
+        new DagManagementTaskStreamImpl(config, Optional.empty(), 
dagManagementStateStore);
+    this.dagManagementTaskStream.setActive(true);
+    this.dagProcFactory = new DagProcFactory();
+    this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+        this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+    this.dagTaskStream = spy(new MockedDagTaskStream());
+    this.dagProcessingEngine =
+        new DagProcessingEngine(config, dagTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+  }
+
+  static class MockedDagTaskStream implements DagTaskStream {
+    public static final int MAX_NUM_OF_TASKS = 10;
+    volatile int i=0;
+
+    @Override
+    public boolean hasNext() {
+      return true;
+    }
+
+    @Override
+    public synchronized DagTask next() {
+      i++;
+      if (i <= MAX_NUM_OF_TASKS) {
+        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, "1234" + i, DagActionStore.FlowActionType.LAUNCH), null);
+      } else {
+        throw new RuntimeException("Max num of tasks reached");
+      }
+    }
+  }
+
+  static class MockedDagTask extends DagTask<MockedDagProc> {
+
+    public MockedDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+      super(dagAction, leaseObtainedStatus);
+    }
+
+    @Override
+    public MockedDagProc host(DagTaskVisitor<MockedDagProc> visitor) {
+      return new MockedDagProc();
+    }
+
+    @Override
+    public boolean conclude() throws IOException {
+      return false;
+    }
+  }
+
+  static class MockedDagProc extends DagProc<Void, Void> {
+
+    @Override
+    protected Void initialize(DagManagementStateStore dagManagementStateStore) 
{
+      return null;
+    }
+
+    @Override
+    protected Void act(DagManagementStateStore dagManagementStateStore, Void 
state) {
+      return null;
+    }
+
+    @Override
+    protected void sendNotification(Void result, EventSubmitter 
eventSubmitter) {
+    }
+
+    @Override
+    protected void commit(DagManagementStateStore dagManagementStateStore, 
Void result) {
+    }
+  }
+
+  // This tests verifies that
+  @Test
+  public void dagProcessingTest() throws InterruptedException, 
TimeoutException {
+    int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS + 
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;

Review Comment:
   actually don't we expect 2x that number of invocations, given that we have 
both `hasNext()` and `next()`?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 909017)
    Time Spent: 31h  (was: 30h 50m)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 31h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to