[
https://issues.apache.org/jira/browse/GOBBLIN-2200?focusedWorklogId=967170&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-967170
]
ASF GitHub Bot logged work on GOBBLIN-2200:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 23/Apr/25 05:05
Start Date: 23/Apr/25 05:05
Worklog Time Spent: 10m
Work Description: abhishekmjain commented on code in PR #4108:
URL: https://github.com/apache/gobblin/pull/4108#discussion_r2055251377
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java:
##########
@@ -38,6 +38,7 @@ public class AzkabanTags {
.put(ConfigurationKeys.AZKABAN_FLOW_ID, "azkabanFlowId")
.put(ConfigurationKeys.AZKABAN_JOB_ID, "azkabanJobId")
.put(ConfigurationKeys.AZKABAN_EXEC_ID, "azkabanExecId")
+ .put(ConfigurationKeys.GAAS_JOB_EXEC_ID,"gaasJobExecId")
Review Comment:
nit: let's put a space after ,
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java:
##########
@@ -78,6 +80,27 @@ public void testSubmitNextNodesSuccess() throws
URISyntaxException, IOException
Mockito.verifyNoMoreInteractions(dagManagementStateStore);
}
+ @Test
+ public void testGaaSJobExecutionIdInjection() throws URISyntaxException,
IOException {
+ Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+ List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList =
jobExecutionPlans.stream()
+ .map(Dag.DagNode<JobExecutionPlan>::new)
+ .collect(Collectors.toList());
+ Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+ // Assertion to test that GaaS job execution Id has been successfully
injected
+ for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+ final String gaasJobExecutionId =
ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID,"");
+ final Long gaasJobExecutionIdHash =
Long.parseLong(ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH,""));
Review Comment:
nit: space after comma
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1047,6 +1047,12 @@ public class ConfigurationKeys {
public static final String AZKABAN_FLOW_ID = "azkaban.flow.flowid";
public static final String AZKABAN_JOB_ID = "azkaban.job.id";
public static final String AZKABAN_EXEC_ID = "azkaban.flow.execid";
+ // Configuration Key for setting a unique job execution identifier in GaaS,
the value is a UUID
+ public static final String GAAS_JOB_EXEC_ID = "gaas.job.execid";
+
+ // Configuration Key for storing hash of gaas.job.execid, to be used as
jobExecutionId(integer) for backwards compatibility
+ public static final String GAAS_JOB_EXEC_ID_HASH =
"gaas.job.executionid.hash";
Review Comment:
why does the rename work?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java:
##########
@@ -78,6 +80,27 @@ public void testSubmitNextNodesSuccess() throws
URISyntaxException, IOException
Mockito.verifyNoMoreInteractions(dagManagementStateStore);
}
+ @Test
+ public void testGaaSJobExecutionIdInjection() throws URISyntaxException,
IOException {
+ Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+ List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList =
jobExecutionPlans.stream()
+ .map(Dag.DagNode<JobExecutionPlan>::new)
+ .collect(Collectors.toList());
+ Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+ // Assertion to test that GaaS job execution Id has been successfully
injected
+ for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+ final String gaasJobExecutionId =
ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID,"");
Review Comment:
nit: can we use `StringUtils.EMPTY` instead of "" ?
Issue Time Tracking
-------------------
Worklog Id: (was: 967170)
Time Spent: 1h 20m (was: 1h 10m)
> Moving Away From Azkaban Execution Id
> -------------------------------------
>
> Key: GOBBLIN-2200
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2200
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Aditya Pratap Singh
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> Moving Away From Azkaban Execution Id
> In several places we use azkaban.flow.exec.id config values to identify the
> jobExecution, since we don't want to tie Gobblin to Azkaban, introducing a
> new fieldĀ
> gaas.job.execid which will serve as the configuration Key for a unique job
> execution identifier in GaaS, the value is a UUID
--
This message was sent by Atlassian Jira
(v8.20.10#820010)