[ 
https://issues.apache.org/jira/browse/GOBBLIN-2010?focusedWorklogId=907852&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-907852
 ]

ASF GitHub Bot logged work on GOBBLIN-2010:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Mar/24 19:53
            Start Date: 01/Mar/24 19:53
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3886:
URL: https://github.com/apache/gobblin/pull/3886#discussion_r1509466436


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpecForBorrowingPriorState.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+/**
+ * Same as {@link WUProcessingSpec}, but for a "stand-alone" "Work 
Fulfillment-only" workflow that leverages the
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s and {@link JobState} 
previously persisted by another separate
+ * job execution.  Accordingly we wish to adjust our {@link 
EventSubmitterContext} to reflect aspects of that original job.
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class WUProcessingSpecForBorrowingPriorState extends WUProcessingSpec {
+  @NonNull
+  private List<Tag<?>> tags = new ArrayList<>();
+  @NonNull private String metricsSuffix = 
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
+
+  public WUProcessingSpecForBorrowingPriorState(URI fileSystemUri, String 
workUnitsDir, EventSubmitterContext eventSubmitterContext) {
+    super(fileSystemUri, workUnitsDir, eventSubmitterContext);
+  }
+
+  @Override
+  public boolean isToDoJobLevelTiming() {
+    return true;
+  }
+
+  @Override
+  public @NonNull EventSubmitterContext getEventSubmitterContext() {
+    // NOTE: We are using the metrics tags from Job Props to create the metric 
context for the timer and NOT
+    // the deserialized jobState from HDFS that is created by the real distcp 
job. This is because the AZ runtime
+    // settings we want are for the job launcher that launched this Yarn job.
+    try {
+      FileSystem fs = Help.loadFileSystemForce(this);
+      JobState jobState = Help.loadJobStateUncached(this, fs);
+      List<Tag<?>> tagsFromCurrentJob = this.getTags();
+      String metricsSuffix = this.getMetricsSuffix();
+      List<Tag<?>> tags = this.calcMergedTags(tagsFromCurrentJob, 
metricsSuffix, jobState);
+      return new EventSubmitterContext(tags, JobMetrics.NAMESPACE);
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private List<Tag<?>> calcMergedTags(List<Tag<?>> tagsFromCurJob, String 
metricsSuffix, JobState jobStateFromHdfs) {
+    // Construct new tags list by combining subset of tags on HDFS job state 
and the rest of the fields from the current job
+    Map<String, Tag<?>> tagsMap = new HashMap<>();
+    Set<String> tagKeysFromJobState = new HashSet<>(Arrays.asList(
+        TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+        TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+        TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+        TimingEvent.FlowEventConstants.JOB_GROUP_FIELD));
+
+    // Step 1, Add tags from the AZ props using the original job (the one that 
launched this yarn app)
+    tagsFromCurJob.forEach(tag -> tagsMap.put(tag.getKey(), tag));
+
+    // Step 2. Add tags from the jobState (the original MR job on HDFS)
+    List<String> targetKeysToAddSuffix = 
Arrays.asList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+    GobblinMetrics.getCustomTagsFromState(jobStateFromHdfs).stream()
+        .filter(tag -> tagKeysFromJobState.contains(tag.getKey()))
+        .forEach(tag -> {
+          // Step 2a (optional): Add a suffix to the FLOW_NAME_FIELD AND 
FLOW_GROUP_FIELDS to prevent collisions when testing
+          String value = targetKeysToAddSuffix.contains(tag.getKey())
+              ? tag.getValue() + metricsSuffix
+              : String.valueOf(tag.getValue());
+          tagsMap.put(tag.getKey(), new Tag<>(tag.getKey(), value));
+        });
+
+    // Step 3: Overwrite any pre-existing metadata with name of the current 
caller
+    tagsMap.put(GobblinMetricsKeys.CLASS_META, new 
Tag<>(GobblinMetricsKeys.CLASS_META, getClass().getCanonicalName()));
+    return new ArrayList<>(tagsMap.values());
+  }
+}

Review Comment:
   I didn't expect we'd ever use it long term, so I kept it here.  I moved 
these verbatim out of `ProcessWorkUnitWorkflowImpl` and am encapsulating this 
special case of needing to mess around w/ and ultimately "spoof" the tags 
entirely within this class, since I don't expect it to be needed going forward 
w/ our E2E flow.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 907852)
    Time Spent: 40m  (was: 0.5h)

> Implement Gobblin-on-Temporal End-to-End Workflow
> -------------------------------------------------
>
>                 Key: GOBBLIN-2010
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2010
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core
>            Reporter: Kip Kohn
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Provide a complete capability to execute a full Gobblin job through 
> Temporal.io. This follows on 
> https://issues.apache.org/jira/browse/GOBBLIN-1945 and 
> https://issues.apache.org/jira/browse/GOBBLIN-2007 .  Thereafter it will be 
> possible to provide an arbitrary `JobState` config that specifies the 
> intended `Source`, `Writer`, Publisher`, `Converter` etc. in order to carry 
> out whatever Data Movement is desired.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to