[
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=909017&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-909017
]
ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 08/Mar/24 21:54
Start Date: 08/Mar/24 21:54
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3858:
URL: https://github.com/apache/gobblin/pull/3858#discussion_r1518342507
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+import static org.mockito.Mockito.spy;
+
+@Slf4j
+public class DagProcessingEngineTest {
+ private static final String TEST_USER = "testUser";
+ private static final String TEST_PASSWORD = "testPassword";
+ private static final String TEST_TABLE = "quotas";
+ static ITestMetastoreDatabase testMetastoreDatabase;
+ DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+ DagManagementTaskStreamImpl dagManagementTaskStream;
+ DagProcessingEngine dagProcessingEngine;
+ DagTaskStream dagTaskStream;
+ DagProcFactory dagProcFactory;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ // Setting up mock DB
+ testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+ Config config;
+ ConfigBuilder configBuilder = ConfigBuilder.create();
+
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
+ config = configBuilder.build();
+
+ // Constructing TopologySpecMap.
+ Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+ String specExecInstance = "mySpecExecutor";
+ TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+ URI specExecURI = new URI(specExecInstance);
+ topologySpecMap.put(specExecURI, topologySpec);
+ MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null);
+ dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+ this.dagManagementTaskStream =
+ new DagManagementTaskStreamImpl(config, Optional.empty(),
dagManagementStateStore);
+ this.dagManagementTaskStream.setActive(true);
+ this.dagProcFactory = new DagProcFactory();
+ this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+ this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore);
+ this.dagTaskStream = spy(new MockedDagTaskStream());
+ this.dagProcessingEngine =
+ new DagProcessingEngine(config, dagTaskStream, this.dagProcFactory,
dagManagementStateStore);
+ }
+
+ static class MockedDagTaskStream implements DagTaskStream {
+ public static final int MAX_NUM_OF_TASKS = 10;
+ volatile int i=0;
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public synchronized DagTask next() {
+ i++;
+ if (i <= MAX_NUM_OF_TASKS) {
+ return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-"
+ i, "1234" + i, DagActionStore.FlowActionType.LAUNCH), null);
+ } else {
+ throw new RuntimeException("Max num of tasks reached");
+ }
+ }
+ }
+
+ static class MockedDagTask extends DagTask<MockedDagProc> {
+
+ public MockedDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ super(dagAction, leaseObtainedStatus);
+ }
+
+ @Override
+ public MockedDagProc host(DagTaskVisitor<MockedDagProc> visitor) {
+ return new MockedDagProc();
+ }
+
+ @Override
+ public boolean conclude() throws IOException {
+ return false;
+ }
+ }
+
+ static class MockedDagProc extends DagProc<Void, Void> {
+
+ @Override
+ protected Void initialize(DagManagementStateStore dagManagementStateStore)
{
+ return null;
+ }
+
+ @Override
+ protected Void act(DagManagementStateStore dagManagementStateStore, Void
state) {
+ return null;
+ }
+
+ @Override
+ protected void sendNotification(Void result, EventSubmitter
eventSubmitter) {
+ }
+
+ @Override
+ protected void commit(DagManagementStateStore dagManagementStateStore,
Void result) {
+ }
+ }
+
+ // This tests verifies that
+ @Test
+ public void dagProcessingTest() throws InterruptedException,
TimeoutException {
+ int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS +
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;
Review Comment:
actually don't we expect 2x that number of invocations, given that we have
both hasNext() and next()?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+import static org.mockito.Mockito.spy;
+
+@Slf4j
+public class DagProcessingEngineTest {
+ private static final String TEST_USER = "testUser";
+ private static final String TEST_PASSWORD = "testPassword";
+ private static final String TEST_TABLE = "quotas";
+ static ITestMetastoreDatabase testMetastoreDatabase;
+ DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+ DagManagementTaskStreamImpl dagManagementTaskStream;
+ DagProcessingEngine dagProcessingEngine;
+ DagTaskStream dagTaskStream;
+ DagProcFactory dagProcFactory;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ // Setting up mock DB
+ testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+ Config config;
+ ConfigBuilder configBuilder = ConfigBuilder.create();
+
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
+ config = configBuilder.build();
+
+ // Constructing TopologySpecMap.
+ Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+ String specExecInstance = "mySpecExecutor";
+ TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+ URI specExecURI = new URI(specExecInstance);
+ topologySpecMap.put(specExecURI, topologySpec);
+ MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null);
+ dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+ this.dagManagementTaskStream =
+ new DagManagementTaskStreamImpl(config, Optional.empty(),
dagManagementStateStore);
+ this.dagManagementTaskStream.setActive(true);
+ this.dagProcFactory = new DagProcFactory();
+ this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+ this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore);
+ this.dagTaskStream = spy(new MockedDagTaskStream());
+ this.dagProcessingEngine =
+ new DagProcessingEngine(config, dagTaskStream, this.dagProcFactory,
dagManagementStateStore);
+ }
+
+ static class MockedDagTaskStream implements DagTaskStream {
+ public static final int MAX_NUM_OF_TASKS = 10;
+ volatile int i=0;
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public synchronized DagTask next() {
+ i++;
+ if (i <= MAX_NUM_OF_TASKS) {
+ return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-"
+ i, "1234" + i, DagActionStore.FlowActionType.LAUNCH), null);
+ } else {
+ throw new RuntimeException("Max num of tasks reached");
+ }
+ }
+ }
+
+ static class MockedDagTask extends DagTask<MockedDagProc> {
+
+ public MockedDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ super(dagAction, leaseObtainedStatus);
+ }
+
+ @Override
+ public MockedDagProc host(DagTaskVisitor<MockedDagProc> visitor) {
+ return new MockedDagProc();
+ }
+
+ @Override
+ public boolean conclude() throws IOException {
+ return false;
+ }
+ }
+
+ static class MockedDagProc extends DagProc<Void, Void> {
+
+ @Override
+ protected Void initialize(DagManagementStateStore dagManagementStateStore)
{
+ return null;
+ }
+
+ @Override
+ protected Void act(DagManagementStateStore dagManagementStateStore, Void
state) {
+ return null;
+ }
+
+ @Override
+ protected void sendNotification(Void result, EventSubmitter
eventSubmitter) {
+ }
+
+ @Override
+ protected void commit(DagManagementStateStore dagManagementStateStore,
Void result) {
+ }
+ }
+
+ // This tests verifies that
+ @Test
+ public void dagProcessingTest() throws InterruptedException,
TimeoutException {
+ int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS +
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;
Review Comment:
actually don't we expect 2x that number of invocations, given that we have
both `hasNext()` and `next()`?
Issue Time Tracking
-------------------
Worklog Id: (was: 909017)
Time Spent: 31h (was: 30h 50m)
> Refactor code to move current in-memory references to new design for REST
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1910
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 31h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)