[
https://issues.apache.org/jira/browse/GOBBLIN-1542?focusedWorklogId=652048&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-652048
]
ASF GitHub Bot logged work on GOBBLIN-1542:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 17/Sep/21 00:55
Start Date: 17/Sep/21 00:55
Worklog Time Spent: 10m
Work Description: autumnust commented on a change in pull request #3393:
URL: https://github.com/apache/gobblin/pull/3393#discussion_r710587332
##########
File path:
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
##########
@@ -269,6 +271,62 @@ protected void executeCancellation() {
}
}
+ protected void removeTaskFromRunningHelixJob(List<String>
workUnitIdsToRemove) throws IOException {
Review comment:
Could you check if line size is within 120 characters?
##########
File path:
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
##########
@@ -89,8 +89,8 @@ void testJobShouldGetCancelled() throws Exception {
IntegrationJobCancelSuite.TASK_STATE_FILE)
.withValue(SleepingTask.SLEEP_TIME_IN_SECONDS,
ConfigValueFactory.fromAnyRef(100));
this.suite = new IntegrationJobCancelSuite(jobConfigOverrides);
- HelixManager helixManager = getHelixManager();
suite.startCluster();
+ HelixManager helixManager = getHelixManager();
Review comment:
is this change relevant ?
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -259,6 +263,25 @@ public static void setDefaultAuthenticator(Properties
properties) {
}
}
+ /**
+ * Handle {@link WorkUnitChangeEvent}, by default it will donothing
Review comment:
donothing -> do nothing
##########
File path:
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
##########
@@ -204,7 +204,7 @@ public void testStartError() throws Exception{
Assert.assertEquals(this.expiredYarnService.getNumRequestedContainers(),
10);
try {
- Thread.sleep(20000);
+ Thread.sleep(60000);
Review comment:
Usually it is not a good practice to use thread.sleep in the unit test
if not necessary. (and 60 seconds is really long if you considering the number
of unit tests we have).
why is this needed ? is there any other way to achieve the same? There's a
test utilities called `AssertWithBackoff` you might want to look into, if
satisfying certain condition is what you want after the sleep.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -259,6 +263,25 @@ public static void setDefaultAuthenticator(Properties
properties) {
}
}
+ /**
+ * Handle {@link WorkUnitChangeEvent}, by default it will donothing
+ */
+ @Subscribe
+ public void handleWorkUnitChangeEvent(WorkUnitChangeEvent
workUnitChangeEvent)
+ throws InvocationTargetException {
+ LOG.info("start to handle workunit change event");
+ try {
+ this.removeTaskFromRunningHelixJob(workUnitChangeEvent.getOldTaskIds());
+ this.addTaskToRunningHelixJob(workUnitChangeEvent.getNewWorkUnits());
+ } catch (Exception e) {
+ //todo: emit some event to indicate there is an error handling this
event that may cause starvation
+ throw new InvocationTargetException(e);
+ }
+ }
+
+ protected void removeTaskFromRunningHelixJob(List<String> taskIdsToRemove)
throws IOException {}
Review comment:
`removeTasksFromHelixJob`?
##########
File path:
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
##########
@@ -145,6 +146,31 @@ static void waitJobInitialization(
log.info("Work flow {} initialized", workFlowName);
}
+ public static boolean deleteTaskFromHelixJob(String workFlowName,
+ String jobName, String taskID, TaskDriver helixTaskDriver) {
+ try {
+ log.info(String.format("try to delete task %s from workflow %s, job %s",
taskID, workFlowName, jobName));
+ helixTaskDriver.deleteTask(workFlowName, jobName, taskID);
+ } catch (Exception e) {
+ e.printStackTrace();
Review comment:
why handle in this way ? why not propagated to caller and let them
determine since this is just a util method ?
##########
File path:
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
##########
@@ -145,6 +146,31 @@ static void waitJobInitialization(
log.info("Work flow {} initialized", workFlowName);
}
+ public static boolean deleteTaskFromHelixJob(String workFlowName,
Review comment:
package-private static is good enough?
##########
File path:
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
##########
@@ -269,6 +271,62 @@ protected void executeCancellation() {
}
}
+ protected void removeTaskFromRunningHelixJob(List<String>
workUnitIdsToRemove) throws IOException {
+ String jobName = this.jobContext.getJobId();
+ try (ParallelRunner stateSerDeRunner = new
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
+ for (String workUnitId : workUnitIdsToRemove) {
+ int helixOperationRetryTime = 3;
+ while (helixOperationRetryTime > 0) {
+ String taskId = workUnitToHelixConfig.get(workUnitId).getId();
+ boolean remove =
HelixUtils.deleteTaskFromHelixJob(this.helixWorkFlowName, jobName, taskId,
this.helixTaskDriver);
+ if (remove) {
+ log.info(String.format("Removed helix task %s with gobblin task id
%s from helix job %s:%s ", taskId,
+ workUnitId, this.helixWorkFlowName, jobName));
+ deleteWorkUnitFromStateStore(workUnitId, stateSerDeRunner);
+ log.info(String.format("remove task state for %s in state store",
workUnitId));
+ this.workUnitToHelixConfig.remove(workUnitId);
+ break;
+ } else {
+ helixOperationRetryTime--;
+ if (helixOperationRetryTime == 0) {
+ log.error(String.format("Failed to remove task %s from helix job
%s:%s ", workUnitId, this.helixWorkFlowName,
+ jobName));
+ throw new IOException(
+ String.format("Cannot remove task %s from helix job %s:%s
after 3 times", workUnitId, this.helixWorkFlowName, jobName));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ protected void addTaskToRunningHelixJob(List<WorkUnit> workUnitsToAdd)
throws IOException {
Review comment:
there's a lot of logic duplication between these two method. What they
do seem to be as simple as:
```
for workunit: workunits:
helixTask = getHelixTask (workunit)
retryer.execute(helixTask, helixOps)
```
Can you try to refactor them a bit ? There's retry object you could use, or
create your own and pass-in the lambda expression
##########
File path:
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
##########
@@ -22,7 +22,6 @@
"name" : "latestFlowExecution",
"parameters" : [ {
"name" : "flowId",
- "doc" : "Retrieve the most recent matching FlowExecution(s) of the
identified FlowId",
Review comment:
is this change relevant?
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -427,6 +450,13 @@ public void apply(JobListener jobListener, JobContext
jobContext)
TimingEvent workUnitsCreationTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
Source<?, ?> source = this.jobContext.getSource();
+ if (source instanceof KafkaSource) {
Review comment:
I think this is leaking the abstraction if we just use `instanceof` here
... AbstractJobLauncher is supposed to contain no implementation-specific
constructs and I would argue even importing stuff from `import
org.apache.gobblin.source.extractor.extract.kafka` package should not happen
here.
Would you reconsider this design, do you really need the abstractJobLauncher
to be registered?
##########
File path:
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
##########
@@ -50,16 +50,16 @@ public void setUp()
suite = new IntegrationBasicSuite(jobConfigOverrides);
helixConfig = suite.getManagerConfig();
- String clusterName =
helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
- String zkConnectString =
helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
- helixManager = HelixManagerFactory.getZKHelixManager(clusterName,
"TestManager",
- InstanceType.SPECTATOR, zkConnectString);
}
@Test (groups = {"disabledOnCI"})
//Test disabled on Travis because cluster integration tests are generally
flaky on Travis.
public void testExecute() throws Exception {
suite.startCluster();
+ String clusterName =
helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
Review comment:
Can you elaborate why this change is needed?
##########
File path:
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
##########
@@ -145,6 +146,31 @@ static void waitJobInitialization(
log.info("Work flow {} initialized", workFlowName);
}
+ public static boolean deleteTaskFromHelixJob(String workFlowName,
+ String jobName, String taskID, TaskDriver helixTaskDriver) {
+ try {
+ log.info(String.format("try to delete task %s from workflow %s, job %s",
taskID, workFlowName, jobName));
+ helixTaskDriver.deleteTask(workFlowName, jobName, taskID);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return
!helixTaskDriver.getJobConfig(TaskUtil.getNamespacedJobName(workFlowName,
jobName)).getMapConfigs().containsKey(taskID);
Review comment:
Do you know what kind of consistency guarantee offered by deleteTask and
addTask from helix ? Or rather, what's the intention of read-after-write
pattern here ?
##########
File path:
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
##########
@@ -302,7 +302,7 @@ public KafkaStreamingExtractor(WorkUnitState state) {
for (int i = 0; i < numOfPartitions; ++i) {
if (workUnit.getProp(topicNameProp, null) == null) {
- log.warn("There's no topic.name property being set in workunt which
could be an illegal state");
+ //log.warn("There's no topic.name property being set in workunt which
could be an illegal state");
Review comment:
if it is not appropriate let's remove this line
##########
File path:
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
##########
@@ -89,8 +89,8 @@ void testJobShouldGetCancelled() throws Exception {
IntegrationJobCancelSuite.TASK_STATE_FILE)
.withValue(SleepingTask.SLEEP_TIME_IN_SECONDS,
ConfigValueFactory.fromAnyRef(100));
this.suite = new IntegrationJobCancelSuite(jobConfigOverrides);
- HelixManager helixManager = getHelixManager();
suite.startCluster();
+ HelixManager helixManager = getHelixManager();
Review comment:
Seems you could `git checkout -- this_file_name` directly since they are
not relevant changes.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -259,6 +263,25 @@ public static void setDefaultAuthenticator(Properties
properties) {
}
}
+ /**
+ * Handle {@link WorkUnitChangeEvent}, by default it will donothing
+ */
+ @Subscribe
+ public void handleWorkUnitChangeEvent(WorkUnitChangeEvent
workUnitChangeEvent)
+ throws InvocationTargetException {
+ LOG.info("start to handle workunit change event");
+ try {
+ this.removeTaskFromRunningHelixJob(workUnitChangeEvent.getOldTaskIds());
+ this.addTaskToRunningHelixJob(workUnitChangeEvent.getNewWorkUnits());
+ } catch (Exception e) {
+ //todo: emit some event to indicate there is an error handling this
event that may cause starvation
+ throw new InvocationTargetException(e);
+ }
+ }
+
+ protected void removeTaskFromRunningHelixJob(List<String> taskIdsToRemove)
throws IOException {}
+ protected void addTaskToRunningHelixJob(List<WorkUnit> workUnitsToAdd)
throws IOException {}
Review comment:
shall it be tasks or there will only be one task ?
##########
File path:
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
##########
@@ -375,6 +378,12 @@ private void
createEmptyWorkUnitsForSkippedPartitions(Map<String, List<WorkUnit>
}
}
+ private void onWorkUnitUpdate (List<String> oldTaskIds, List<WorkUnit>
newWorkUnits) {
Review comment:
Where is this method called? should it really be Source's responsibility
to detect it ?
##########
File path:
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
##########
@@ -168,6 +169,8 @@
private MetricContext metricContext;
protected Optional<LineageInfo> lineageInfo;
+ @Setter
+ protected EventBus eventBus;
Review comment:
it sounds awkward to set this object from external.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 652048)
Time Spent: 1.5h (was: 1h 20m)
> Integrate with Helix API to add/remove task from a running helix job
> --------------------------------------------------------------------
>
> Key: GOBBLIN-1542
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1542
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Integrate with Helix API to add/remove task from a running helix job. This
> ticket also contains the API change that Source need to contact with
> JobLauncher to notify that there is some change in workunit. This will enable
> us to dynamically add/delete task from running job and save resources.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)