[
https://issues.apache.org/jira/browse/GOBBLIN-2153?focusedWorklogId=934379&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934379
]
ASF GitHub Bot logged work on GOBBLIN-2153:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Sep/24 01:24
Start Date: 12/Sep/24 01:24
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #4052:
URL: https://github.com/apache/gobblin/pull/4052#discussion_r1755945665
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtils.java:
##########
@@ -0,0 +1,52 @@
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.NonNull;
+import lombok.experimental.UtilityClass;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+
+
+/**
+ * Utility class for handling Temporal workflow-related operations.
+ */
+@UtilityClass
+public class TemporalWorkFlowUtils {
+
+ /**
+ * Generates search attributes for a WorkFlow based on the provided GAAS
job properties.
+ *
+ * @param jobProps the properties of the job, must not be null.
+ * @return a map containing the generated search attributes.
+ */
+ public static Map<String, Object> generateGaasSearchAttributes(@NonNull
Properties jobProps) {
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(Help.GAAS_FLOW_KEY, String.format("%s.%s",
jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY),
Review Comment:
we should call this GAAS_FLOW_ID_SEARCH_KEY, as flowGroup + flowName is the
UUID for a given flow config in GaaS
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -57,21 +59,25 @@ public CommitStats process(WUProcessingSpec workSpec) {
}
private CommitStats performWork(WUProcessingSpec workSpec) {
- Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
- NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec);
- int workunitsProcessed = processingWorkflow.performWorkload(
- WorkflowAddr.ROOT, workload, 0,
- workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
- );
- if (workunitsProcessed > 0) {
- CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
- CommitStats result = commitWorkflow.commit(workSpec);
- if (result.getNumCommittedWorkUnits() == 0) {
- log.warn("No work units committed at the job level. They could have
been committed at the task level.");
+ try {
+ Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
+ JobState jobState = Help.loadJobState(workSpec,
Help.loadFileSystem(workSpec));
+ NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec, jobState);
+ int workunitsProcessed =
processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+ workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
+ if (workunitsProcessed > 0) {
+ CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(jobState);
+ CommitStats result = commitWorkflow.commit(workSpec);
+ if (result.getNumCommittedWorkUnits() == 0) {
+ log.warn("No work units committed at the job level. They could have
been committed at the task level.");
+ }
+ return result;
+ } else {
+ log.error("No work units processed, so no commit attempted.");
+ return CommitStats.createEmpty();
}
- return result;
- } else {
- log.error("No work units processed, so no commit attempted.");
+ } catch (Exception ignored) {
+ log.error("Exception occured during performing Work", ignored);
return CommitStats.createEmpty();
Review Comment:
What's the rationale for this try catch? Seems dangerous to ignore
exceptions during processworkunits step, shouldn't we want it to fail loudly?
Issue Time Tracking
-------------------
Worklog Id: (was: 934379)
Time Spent: 50m (was: 40m)
> Add SearchAttributes to filter Temporal Flows in the UI
> -------------------------------------------------------
>
> Key: GOBBLIN-2153
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2153
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Aditya Pratap Singh
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Add SearchAttributes to filter Temporal Flows in the UI
--
This message was sent by Atlassian Jira
(v8.20.10#820010)