[
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=882974&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-882974
]
ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 02/Oct/23 18:27
Start Date: 02/Oct/23 18:27
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1342955936
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
Review Comment:
nit: strike "An interface to provide"
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+ /**
+ * Currently, it is handling just the launch of a {@link Dag} request via
REST client for adhoc flows.
+ * The eventTimestamp for adhoc flows will always be 0 (zero), while -1
would indicate failures.
+ * Essentially for a valid launch flow, the eventTimestamp needs to be >= 0.
+ * Future implementations will cover launch of flows through the scheduler
too!
Review Comment:
rather than documenting this to be merely informative, could we instead
rephrase as an interface req/policy?
(overall, I'm uncertain on your overall plan for the impl's validation, so
this is based solely on reading the javadoc in isolation...)
e.g. throws IllegalArgException when `eventTimestamp < 0`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.CleanUpDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.ResumeDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given
{@link DagTask}.
+ */
+
+@Alpha
+@Slf4j
+public class DagProcFactory implements DagTaskVisitor<DagProc> {
+
+ private DagManagementStateStore dagManagementStateStore;
+ private JobStatusRetriever jobStatusRetriever;
+ private FlowStatusGenerator flowStatusGenerator;
+ private UserQuotaManager quotaManager;
+ private SpecCompiler specCompiler;
+ private FlowCatalog flowCatalog;
+ private FlowCompilationValidationHelper flowCompilationValidationHelper;
+ private Config config;
+ private Optional<EventSubmitter> eventSubmitter;
+ private boolean instrumentationEnabled;
+
+ public DagProcFactory(Config config, DagManagementStateStore
dagManagementStateStore, JobStatusRetriever jobStatusRetriever,
+ FlowStatusGenerator flowStatusGenerator, UserQuotaManager quotaManager,
SpecCompiler specCompiler, FlowCatalog flowCatalog,
FlowCompilationValidationHelper flowCompilationValidationHelper, boolean
instrumentationEnabled) {
+
+ this.config = config;
+ this.dagManagementStateStore = dagManagementStateStore;
+ this.jobStatusRetriever = jobStatusRetriever;
+ this.flowStatusGenerator = flowStatusGenerator;
+ this.quotaManager = quotaManager;
+ this.specCompiler = specCompiler;
+ this.flowCatalog = flowCatalog;
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
+ this.instrumentationEnabled = instrumentationEnabled;
Review Comment:
these look like they could be `private final`... is that true?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios
for launch, resume, kill, job start
+ * and flow completion deadlines
Review Comment:
rather than "responsible for *defining the behavior*..." seems more the
means by which such tasks are triggered.
that said, may be too much impl. detail to mention `DagTask` in the javadoc,
since I don't see it actually used in the code here
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+ public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+ public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan>
dagNode);
+
+ public boolean hasRunningJobs(String dagId);
+
+ public void removeDagActionFromStore(DagManager.DagId dagId,
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+ public void addDagSLA(String dagId, Long flowSla);
+
+ public Long getDagSLA(String dagId);
+
+ public Dag<JobExecutionPlan> getDag(String dagId);
+
+ public LinkedList<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId)
throws IOException;
+
+ public boolean addFailedDagId(String dagId);
+
+ public boolean checkFailedDagId(String dagId);
Review Comment:
unclear: what is meant by "check" - for membership/existence?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc}
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+ public static final String NUM_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+ public static final String JOB_STATUS_POLLING_INTERVAL_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+ public static final String MAX_RETRY_ATTEMPTS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+ public static final String RETRY_DELAY_MS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+ private static final Integer DEFAULT_NUM_THREADS = 3;
+ private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+ private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+ private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+ private final DagTaskStream dagTaskStream;
+ private final DagProcFactory dagProcFactory;
+ private final DagManagementStateStore dagManagementStateStore;
+ private final ScheduledExecutorService scheduledExecutorPool;
+ private final MetricContext metricContext;
+ private final Optional<EventSubmitter> eventSubmitter;
+ private final Config config;
+ private final Integer numThreads;
+ private final Integer pollingInterval;
+ private final Integer maxRetryAttempts;
+ private final long delayRetryMillis;
+ private final DagProcessingEngine.Thread [] threads;
+
+
+ public DagProcessingEngine(Config config, DagTaskStream dagTaskStream,
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore,
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+ this.config = config;
+ this.dagTaskStream = dagTaskStream;
+ this.dagProcFactory = dagProcFactory;
+ this.dagManagementStateStore = dagManagementStateStore;
+ metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+ this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
+ this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+ this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+ this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS,
DEFAULT_MAX_RETRY_ATTEMPTS);
+ this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS,
DEFAULT_RETRY_DELAY_MS);
+ this.threads = new DagProcessingEngine.Thread[this.numThreads];
+ for(int i=0; i < this.numThreads; i++) {
+ Thread thread = new Thread(dagTaskStream, dagProcFactory,
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts,
this.delayRetryMillis);
+ this.threads[i] = thread;
+ }
+ }
+
+
+ @AllArgsConstructor
+ private static class Thread implements Runnable {
+
+ private DagTaskStream dagTaskStream;
+ private DagProcFactory dagProcFactory;
+ private DagManagementStateStore dagManagementStateStore;
+ private Optional<EventSubmitter> eventSubmitter;
+ private Integer maxRetryAttempts;
+ private long delayRetryMillis;
Review Comment:
`final`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc}
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+ public static final String NUM_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+ public static final String JOB_STATUS_POLLING_INTERVAL_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+ public static final String MAX_RETRY_ATTEMPTS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+ public static final String RETRY_DELAY_MS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+ private static final Integer DEFAULT_NUM_THREADS = 3;
+ private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+ private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+ private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+ private final DagTaskStream dagTaskStream;
+ private final DagProcFactory dagProcFactory;
+ private final DagManagementStateStore dagManagementStateStore;
+ private final ScheduledExecutorService scheduledExecutorPool;
+ private final MetricContext metricContext;
+ private final Optional<EventSubmitter> eventSubmitter;
+ private final Config config;
+ private final Integer numThreads;
+ private final Integer pollingInterval;
+ private final Integer maxRetryAttempts;
+ private final long delayRetryMillis;
+ private final DagProcessingEngine.Thread [] threads;
+
+
+ public DagProcessingEngine(Config config, DagTaskStream dagTaskStream,
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore,
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+ this.config = config;
+ this.dagTaskStream = dagTaskStream;
+ this.dagProcFactory = dagProcFactory;
+ this.dagManagementStateStore = dagManagementStateStore;
+ metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+ this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
+ this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+ this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+ this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS,
DEFAULT_MAX_RETRY_ATTEMPTS);
+ this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS,
DEFAULT_RETRY_DELAY_MS);
+ this.threads = new DagProcessingEngine.Thread[this.numThreads];
+ for(int i=0; i < this.numThreads; i++) {
+ Thread thread = new Thread(dagTaskStream, dagProcFactory,
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts,
this.delayRetryMillis);
+ this.threads[i] = thread;
+ }
+ }
+
+
+ @AllArgsConstructor
+ private static class Thread implements Runnable {
+
+ private DagTaskStream dagTaskStream;
+ private DagProcFactory dagProcFactory;
+ private DagManagementStateStore dagManagementStateStore;
+ private Optional<EventSubmitter> eventSubmitter;
+ private Integer maxRetryAttempts;
+ private long delayRetryMillis;
+
+ @Override
+ public void run() {
+ try {
+
+ for(DagTask dagTask : dagTaskStream) {
Review Comment:
nit: extra newline, but missing space between `for (`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+
+
+/**
+ * Interface defining {@link DagTask} based on the type of visitor.
+ * @param <DagProc>
+ */
+@Alpha
+public interface DagTaskVisitor<DagProc> {
Review Comment:
nit: convention would be `T` (or at the most `DagProcType`). naming it
after an actual type (even though not imported) is quite confusing
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc}
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+ public static final String NUM_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+ public static final String JOB_STATUS_POLLING_INTERVAL_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+ public static final String MAX_RETRY_ATTEMPTS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+ public static final String RETRY_DELAY_MS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+ private static final Integer DEFAULT_NUM_THREADS = 3;
+ private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+ private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+ private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+ private final DagTaskStream dagTaskStream;
+ private final DagProcFactory dagProcFactory;
+ private final DagManagementStateStore dagManagementStateStore;
+ private final ScheduledExecutorService scheduledExecutorPool;
+ private final MetricContext metricContext;
+ private final Optional<EventSubmitter> eventSubmitter;
+ private final Config config;
+ private final Integer numThreads;
+ private final Integer pollingInterval;
+ private final Integer maxRetryAttempts;
+ private final long delayRetryMillis;
+ private final DagProcessingEngine.Thread [] threads;
+
+
+ public DagProcessingEngine(Config config, DagTaskStream dagTaskStream,
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore,
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+ this.config = config;
+ this.dagTaskStream = dagTaskStream;
+ this.dagProcFactory = dagProcFactory;
+ this.dagManagementStateStore = dagManagementStateStore;
+ metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+ this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
+ this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+ this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+ this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS,
DEFAULT_MAX_RETRY_ATTEMPTS);
+ this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS,
DEFAULT_RETRY_DELAY_MS);
+ this.threads = new DagProcessingEngine.Thread[this.numThreads];
+ for(int i=0; i < this.numThreads; i++) {
+ Thread thread = new Thread(dagTaskStream, dagProcFactory,
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts,
this.delayRetryMillis);
+ this.threads[i] = thread;
+ }
+ }
+
+
+ @AllArgsConstructor
+ private static class Thread implements Runnable {
+
+ private DagTaskStream dagTaskStream;
+ private DagProcFactory dagProcFactory;
+ private DagManagementStateStore dagManagementStateStore;
+ private Optional<EventSubmitter> eventSubmitter;
+ private Integer maxRetryAttempts;
+ private long delayRetryMillis;
+
+ @Override
+ public void run() {
+ try {
+
+ for(DagTask dagTask : dagTaskStream) {
+ DagProc dagProc = dagTask.host(dagProcFactory);
+ dagProc.process(dagManagementStateStore, eventSubmitter.get(),
maxRetryAttempts, delayRetryMillis);
+ //marks lease success and releases it
+ dagTaskStream.complete(dagTask);
Review Comment:
first off, I absolutely LOVE how simple this impl is--even as it segregates
responsibility elsewhere for deciding **how** to process each specific task
type.
that said, I wonder whether we might go farther an encapsulate the
`complete`/commit.
e.g. the stream could construct each task w/ a:
```
@FunctionalInterface
interface Committer {
void commit();
}
```
that would internally store the lease. the `DagProcFactory` would then take
that from the task and preserve it in the proc, so the last step of the proc's
`process` could call it
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -108,7 +108,7 @@ static DagManager.DagId generateDagId(String flowGroup,
String flowName, long fl
return generateDagId(flowGroup, flowName, String.valueOf(flowExecutionId));
}
- static DagManager.DagId generateDagId(String flowGroup, String flowName,
String flowExecutionId) {
+ public static DagManager.DagId generateDagId(String flowGroup, String
flowName, String flowExecutionId) {
Review Comment:
I see... if already widely used, no need to rip out a method you did not
add. but if only once or twice, I suggest getting rid of it.
contrary to your reply, I do not observe this impl converting between
`String` and `long`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * A {@link DagTask} responsible to handle kill tasks.
+ */
+@Alpha
+public class KillDagTask extends DagTask {
+
+ @Getter
+ private final DagManager.DagId killDagId;
+ protected final DagActionStore.DagAction killAction;
+
+ public KillDagTask(DagActionStore.DagAction killAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+
+ this.killAction = killAction;
+ this.leaseObtainedStatusStatus = leaseObtainedStatus;
Review Comment:
should this be initialized by a call to `super()`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state,
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+ abstract protected S initialize(DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, IOException;
+ abstract protected R act(S state, DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, Exception;
+ abstract protected void sendNotification(R result, EventSubmitter
eventSubmitter) throws MaybeRetryableException, IOException;
+
+ public final void process(DagManagementStateStore dagManagementStateStore,
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
+ try {
+ S state = this.initializeWithRetries(dagManagementStateStore,
maxRetryCount, delayRetryMillis);
+ R result = this.actWithRetries(state, dagManagementStateStore,
maxRetryCount, delayRetryMillis); // may be pass state store too here
+ this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount,
delayRetryMillis);
+ log.info("Successfully processed Dag Request");
+ } catch (Exception ex) {
+ throw new RuntimeException("Cannot process Dag Request: ", ex);
Review Comment:
I use checked exceptions sparingly, but here, one may actually make sense
(rather than `RuntimeException`)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+ public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+ public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan>
dagNode);
+
+ public boolean hasRunningJobs(String dagId);
+
+ public void removeDagActionFromStore(DagManager.DagId dagId,
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+ public void addDagSLA(String dagId, Long flowSla);
+
+ public Long getDagSLA(String dagId);
Review Comment:
don't we have two kinds? also, since this has never been an SLA, but rather
a **deadline**, would now be an opportunity to adopt the accurate nomenclature?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc}
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+ public static final String NUM_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+ public static final String JOB_STATUS_POLLING_INTERVAL_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+ public static final String MAX_RETRY_ATTEMPTS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+ public static final String RETRY_DELAY_MS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
Review Comment:
I agree w/ a key, like 'xyz.enabled'... but don't believe that should do
double duty as the prefix for other configs. let's define `PREFIX = "xyz"` and
then have `PREFIX + ".enabled"` merely be a stand-alone key
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state,
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+ abstract protected S initialize(DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, IOException;
+ abstract protected R act(S state, DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, Exception;
+ abstract protected void sendNotification(R result, EventSubmitter
eventSubmitter) throws MaybeRetryableException, IOException;
+
+ public final void process(DagManagementStateStore dagManagementStateStore,
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
Review Comment:
eventually, retries may not be governed by a static count, but rather
tracking to how much time remains in the lease
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc}
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+ public static final String NUM_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+ public static final String JOB_STATUS_POLLING_INTERVAL_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+ public static final String MAX_RETRY_ATTEMPTS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+ public static final String RETRY_DELAY_MS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+ private static final Integer DEFAULT_NUM_THREADS = 3;
+ private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+ private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+ private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+ private final DagTaskStream dagTaskStream;
+ private final DagProcFactory dagProcFactory;
+ private final DagManagementStateStore dagManagementStateStore;
+ private final ScheduledExecutorService scheduledExecutorPool;
+ private final MetricContext metricContext;
+ private final Optional<EventSubmitter> eventSubmitter;
+ private final Config config;
+ private final Integer numThreads;
+ private final Integer pollingInterval;
+ private final Integer maxRetryAttempts;
+ private final long delayRetryMillis;
+ private final DagProcessingEngine.Thread [] threads;
+
+
+ public DagProcessingEngine(Config config, DagTaskStream dagTaskStream,
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore,
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+ this.config = config;
+ this.dagTaskStream = dagTaskStream;
+ this.dagProcFactory = dagProcFactory;
+ this.dagManagementStateStore = dagManagementStateStore;
+ metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+ this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
+ this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+ this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+ this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS,
DEFAULT_MAX_RETRY_ATTEMPTS);
+ this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS,
DEFAULT_RETRY_DELAY_MS);
+ this.threads = new DagProcessingEngine.Thread[this.numThreads];
+ for(int i=0; i < this.numThreads; i++) {
+ Thread thread = new Thread(dagTaskStream, dagProcFactory,
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts,
this.delayRetryMillis);
+ this.threads[i] = thread;
+ }
+ }
+
+
+ @AllArgsConstructor
+ private static class Thread implements Runnable {
+
+ private DagTaskStream dagTaskStream;
+ private DagProcFactory dagProcFactory;
+ private DagManagementStateStore dagManagementStateStore;
+ private Optional<EventSubmitter> eventSubmitter;
+ private Integer maxRetryAttempts;
+ private long delayRetryMillis;
+
+ @Override
+ public void run() {
+ try {
+
+ for(DagTask dagTask : dagTaskStream) {
+ DagProc dagProc = dagTask.host(dagProcFactory);
+ dagProc.process(dagManagementStateStore, eventSubmitter.get(),
maxRetryAttempts, delayRetryMillis);
+ //marks lease success and releases it
+ dagTaskStream.complete(dagTask);
+ }
+ } catch (Exception ex) {
+ //TODO: need to handle exceptions gracefully
+ log.error(String.format("Exception encountered in %s",
getClass().getName()), ex);
+ throw new RuntimeException(ex);
+ }
Review Comment:
yes, BIG TIME TODO! (RN, looks like just one would crash this thread!)
suggestion: borrow from the current `DagManagerThread` handling
Issue Time Tracking
-------------------
Worklog Id: (was: 882974)
Time Spent: 12h (was: 11h 50m)
> Refactor code to move current in-memory references to new design for REST
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1910
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 12h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)