[
https://issues.apache.org/jira/browse/GOBBLIN-2136?focusedWorklogId=933869&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-933869
]
ASF GitHub Bot logged work on GOBBLIN-2136:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 09/Sep/24 21:00
Start Date: 09/Sep/24 21:00
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4041:
URL: https://github.com/apache/gobblin/pull/4041#discussion_r1750642724
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json:
##########
@@ -264,11 +264,14 @@
"deprecated" : "Use FlowExecution instead"
}, "org.apache.gobblin.service.FlowStatusId",
"org.apache.gobblin.service.Issue", "org.apache.gobblin.service.IssueSeverity",
"org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState",
"org.apache.gobblin.service.JobStatistics",
"org.apache.gobblin.service.JobStatus", "org.apache.gobblin.service.Timestamp"
],
"schema" : {
+ "annotations" : {
+ "deprecated" : { }
+ },
"name" : "flowstatuses",
"namespace" : "org.apache.gobblin.service",
"path" : "/flowstatuses",
"schema" : "org.apache.gobblin.service.FlowStatus",
- "doc" : "Resource for handling flow status requests\n\ngenerated from:
org.apache.gobblin.service.FlowStatusResource",
+ "doc" : "Resource for handling flow status requests. Deprecated, use
{@link FlowExecutionResource}\n\ngenerated from:
org.apache.gobblin.service.FlowStatusResource",
Review Comment:
same advice here
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowExecutionResourceHandler.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.inject.Inject;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.RestLiServiceException;
+import com.linkedin.restli.server.UpdateResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.service.FlowExecution;
+import org.apache.gobblin.service.FlowExecutionResource;
+import org.apache.gobblin.service.FlowExecutionResourceHandlerInterface;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.FlowStatusId;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.monitoring.FlowStatus;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+
+
+@Slf4j
+public class FlowExecutionResourceHandler implements
FlowExecutionResourceHandlerInterface {
Review Comment:
javadoc?
also, the diff isn't showing this as a rename, but is that what it is?
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json:
##########
@@ -1,9 +1,12 @@
{
+ "annotations" : {
+ "deprecated" : { }
+ },
"name" : "flowstatuses",
"namespace" : "org.apache.gobblin.service",
"path" : "/flowstatuses",
"schema" : "org.apache.gobblin.service.FlowStatus",
- "doc" : "Resource for handling flow status requests\n\ngenerated from:
org.apache.gobblin.service.FlowStatusResource",
+ "doc" : "Resource for handling flow status requests. Deprecated, use {@link
FlowExecutionResource}\n\ngenerated from:
org.apache.gobblin.service.FlowStatusResource",
Review Comment:
suggest to announce deprecation first:
```
Deprecated, use {@link FlowExecutionResource}\n\nResource for handling flow
status requests\n\ngenerated from:
org.apache.gobblin.service.FlowStatusResource",
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -289,7 +280,7 @@ private void registerServicesInLauncher(){
private void configureServices(){
if (configuration.isRestLIServerEnabled()) {
this.restliServer = EmbeddedRestliServer.builder()
- .resources(Lists.newArrayList(FlowConfigsResource.class,
FlowConfigsV2Resource.class))
+ .resources(Lists.newArrayList(FlowConfigsV2Resource.class,
FlowConfigsV2Resource.class))
Review Comment:
can we list just once?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/FlowExecutionTest.java:
##########
@@ -30,29 +30,28 @@
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
-import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
-import com.google.inject.Module;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.restli.FlowExecutionResourceHandler;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import static org.mockito.Mockito.mock;
@Test(groups = { "gobblin.service" }, singleThreaded = true)
-public class FlowStatusTest {
- private FlowStatusClient _client;
+public class FlowExecutionTest {
Review Comment:
`FlowExecutionClientSideTest`?
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -60,14 +63,22 @@
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.FlowConfig;
+import org.apache.gobblin.service.FlowConfigLoggedException;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.FlowStatusId;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.Schedule;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
-/**
- * A {@link FlowConfigsResourceHandler} that handles Restli locally.
- */
@Slf4j
-public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandler {
+public class FlowConfigsV2ResourceHandler {
Review Comment:
why not implement a `FlowConfigsV2ResourceHandlerInterface`, like
flowExecutions does?
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java:
##########
@@ -29,6 +29,7 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler;
public class FlowConfigResourceLocalHandlerTest {
Review Comment:
shouldn't this be renamed to V2? also, hasn't the "resource local handler"
naming convention also gone away?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.TimeUnit;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Tests the state updates (including updating in-memory state and
MysqlDagActionStore) after performing add or cancel
+ * operations by calling addDag, stopDag, kill, and resume. It also tests
flows with and without sla configs.
+ */
+public class DagFlowTest {
+
+ @Test
+ void slaConfigCheck() throws Exception {
+ Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("5", 123456783L,
"FINISH_RUNNING", 1);
+
Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)),
ServiceConfigKeys.DEFAULT_FLOW_FINISH_DEADLINE_MILLIS);
+
+ Config jobConfig =
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+ jobConfig = jobConfig
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef("7"))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
+ dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
+
Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)),
TimeUnit.SECONDS.toMillis(7L));
+
+ jobConfig = jobConfig
Review Comment:
can we use a fresh name, like `jobConfig2`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -114,7 +114,6 @@ Metrics need to be created before initializeMonitor() below
is called (or more s
protected void assignTopicPartitions() {
// Expects underlying consumer to handle initializing partitions and
offset for the topic -
// subscribe to all partitions from latest offset
Review Comment:
I'm a bit fuzzy on this (and wondering whether it should be an `abstract`
method).
if the empty but concrete impl is actually what we want, is it possible to
name the specific method involved in the "underlying consumer"?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.TimeUnit;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Tests the state updates (including updating in-memory state and
MysqlDagActionStore) after performing add or cancel
+ * operations by calling addDag, stopDag, kill, and resume. It also tests
flows with and without sla configs.
+ */
+public class DagFlowTest {
+
+ @Test
+ void slaConfigCheck() throws Exception {
Review Comment:
this javadoc talks about a lot, but it seems mostly like just testing
`DagUtils.getFlowFinishDeadline`. is there more than that, which I'm not
grasping?
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandlerTest.java:
##########
@@ -17,50 +17,49 @@
package org.apache.gobblin.service;
-
import org.testng.Assert;
import org.testng.annotations.Test;
public class FlowExecutionResourceLocalHandlerTest {
Review Comment:
let's rename, since the `FlowExecutionResourceLocalHandler` looks to have
been deleted
Issue Time Tracking
-------------------
Worklog Id: (was: 933869)
Time Spent: 7h 40m (was: 7.5h)
> remove obsolete code related to DagManager
> ------------------------------------------
>
> Key: GOBBLIN-2136
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2136
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 7h 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)