[
https://issues.apache.org/jira/browse/GOBBLIN-2010?focusedWorklogId=907852&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-907852
]
ASF GitHub Bot logged work on GOBBLIN-2010:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 01/Mar/24 19:53
Start Date: 01/Mar/24 19:53
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3886:
URL: https://github.com/apache/gobblin/pull/3886#discussion_r1509466436
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpecForBorrowingPriorState.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+/**
+ * Same as {@link WUProcessingSpec}, but for a "stand-alone" "Work
Fulfillment-only" workflow that leverages the
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s and {@link JobState}
previously persisted by another separate
+ * job execution. Accordingly we wish to adjust our {@link
EventSubmitterContext} to reflect aspects of that original job.
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class WUProcessingSpecForBorrowingPriorState extends WUProcessingSpec {
+ @NonNull
+ private List<Tag<?>> tags = new ArrayList<>();
+ @NonNull private String metricsSuffix =
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
+
+ public WUProcessingSpecForBorrowingPriorState(URI fileSystemUri, String
workUnitsDir, EventSubmitterContext eventSubmitterContext) {
+ super(fileSystemUri, workUnitsDir, eventSubmitterContext);
+ }
+
+ @Override
+ public boolean isToDoJobLevelTiming() {
+ return true;
+ }
+
+ @Override
+ public @NonNull EventSubmitterContext getEventSubmitterContext() {
+ // NOTE: We are using the metrics tags from Job Props to create the metric
context for the timer and NOT
+ // the deserialized jobState from HDFS that is created by the real distcp
job. This is because the AZ runtime
+ // settings we want are for the job launcher that launched this Yarn job.
+ try {
+ FileSystem fs = Help.loadFileSystemForce(this);
+ JobState jobState = Help.loadJobStateUncached(this, fs);
+ List<Tag<?>> tagsFromCurrentJob = this.getTags();
+ String metricsSuffix = this.getMetricsSuffix();
+ List<Tag<?>> tags = this.calcMergedTags(tagsFromCurrentJob,
metricsSuffix, jobState);
+ return new EventSubmitterContext(tags, JobMetrics.NAMESPACE);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ private List<Tag<?>> calcMergedTags(List<Tag<?>> tagsFromCurJob, String
metricsSuffix, JobState jobStateFromHdfs) {
+ // Construct new tags list by combining subset of tags on HDFS job state
and the rest of the fields from the current job
+ Map<String, Tag<?>> tagsMap = new HashMap<>();
+ Set<String> tagKeysFromJobState = new HashSet<>(Arrays.asList(
+ TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+ TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+ TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+ TimingEvent.FlowEventConstants.JOB_GROUP_FIELD));
+
+ // Step 1, Add tags from the AZ props using the original job (the one that
launched this yarn app)
+ tagsFromCurJob.forEach(tag -> tagsMap.put(tag.getKey(), tag));
+
+ // Step 2. Add tags from the jobState (the original MR job on HDFS)
+ List<String> targetKeysToAddSuffix =
Arrays.asList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ GobblinMetrics.getCustomTagsFromState(jobStateFromHdfs).stream()
+ .filter(tag -> tagKeysFromJobState.contains(tag.getKey()))
+ .forEach(tag -> {
+ // Step 2a (optional): Add a suffix to the FLOW_NAME_FIELD AND
FLOW_GROUP_FIELDS to prevent collisions when testing
+ String value = targetKeysToAddSuffix.contains(tag.getKey())
+ ? tag.getValue() + metricsSuffix
+ : String.valueOf(tag.getValue());
+ tagsMap.put(tag.getKey(), new Tag<>(tag.getKey(), value));
+ });
+
+ // Step 3: Overwrite any pre-existing metadata with name of the current
caller
+ tagsMap.put(GobblinMetricsKeys.CLASS_META, new
Tag<>(GobblinMetricsKeys.CLASS_META, getClass().getCanonicalName()));
+ return new ArrayList<>(tagsMap.values());
+ }
+}
Review Comment:
I didn't expect we'd ever use it long term, so I kept it here. I moved
these verbatim out of `ProcessWorkUnitWorkflowImpl` and am encapsulating this
special case of needing to mess around w/ and ultimately "spoof" the tags
entirely within this class, since I don't expect it to be needed going forward
w/ our E2E flow.
Issue Time Tracking
-------------------
Worklog Id: (was: 907852)
Time Spent: 40m (was: 0.5h)
> Implement Gobblin-on-Temporal End-to-End Workflow
> -------------------------------------------------
>
> Key: GOBBLIN-2010
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2010
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Provide a complete capability to execute a full Gobblin job through
> Temporal.io. This follows on
> https://issues.apache.org/jira/browse/GOBBLIN-1945 and
> https://issues.apache.org/jira/browse/GOBBLIN-2007 . Thereafter it will be
> possible to provide an arbitrary `JobState` config that specifies the
> intended `Source`, `Writer`, Publisher`, `Converter` etc. in order to carry
> out whatever Data Movement is desired.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)