abhishekmjain commented on code in PR #4136:
URL: https://github.com/apache/gobblin/pull/4136#discussion_r2300178657
##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java:
##########
@@ -96,6 +97,8 @@ public class ServiceMetricNames {
public static final String DAG_ACTIONS_ACT_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed.";
public static final String DAG_ACTIONS_ACT_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded.";
public static final String DAG_ACTIONS_CONCLUDE_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed.";
+ public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED
= DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalSucceeded";
Review Comment:
I see a `.` at the end of each metric name, is it not needed for the new
ones we add?
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -248,8 +251,22 @@ public CreateKVResponse<ComplexResourceKey<FlowId,
FlowStatusId>, FlowConfig> cr
// Return conflict and take no action if flowSpec has already been created
if (this.flowCatalog.exists(flowSpec.getUri())) {
log.warn("FlowSpec with URI {} already exists, no action will be taken",
flowSpec.getUri());
- return new CreateKVResponse<>(new
RestLiServiceException(HttpStatus.S_409_CONFLICT,
- "FlowSpec with URI " + flowSpec.getUri() + " already exists, no
action will be taken"));
+ try {
+ FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri());
+ if (!storedFlowSpec.isScheduled()) {
+ log.error("FlowSpec Already Exists As Adhoc Flow with URI: " +
flowSpec.getUri());
Review Comment:
This is not an error scenario for service, there could be genuine multiple
attempts. WARN should be fine IMO.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -163,6 +167,14 @@ public void
markDagActionsConclude(DagActionStore.DagActionType dagActionType, b
}
}
+ public void
markDagActionsConflowFlowSpecRemoval(DagActionStore.DagActionType
dagActionType, boolean succeeded) {
+ if (succeeded) {
+
updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType,
dagActionType);
+ } else {
Review Comment:
is succeeded scenario going to be called?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -163,6 +167,14 @@ public void
markDagActionsConclude(DagActionStore.DagActionType dagActionType, b
}
}
+ public void
markDagActionsConflowFlowSpecRemoval(DagActionStore.DagActionType
dagActionType, boolean succeeded) {
Review Comment:
`markDagActionsConflowFlowSpecRemoval` - is conflow a typo?
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -100,6 +101,8 @@ public
FlowConfigsV2ResourceHandler(@Named(InjectionNames.SERVICE_NAME) String s
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.DELETE_FLOW_METER));
this.runImmediatelyFlow = metricContext.contextAwareMeter(
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER));
+ this.flowSpecExistsForAdhocFlow = metricContext.contextAwareMeter(
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames. RUN_IMMEDIATELY_FLOW_METER));
Review Comment:
why are we reusing `RUN_IMMEDIATELY_FLOW_METER`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -89,6 +91,8 @@ public void registerAllMetrics() {
registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED);
registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED);
registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED);
+
registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType,
ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED);
Review Comment:
meter spelling is incorrect
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]