Blazer-007 commented on code in PR #4082:
URL: https://github.com/apache/gobblin/pull/4082#discussion_r1878128292
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java:
##########
@@ -66,13 +66,25 @@ public class JobLauncherUtils {
public static class WorkUnitPathCalculator {
private final AtomicInteger nextMultiWorkUnitTaskId = new AtomicInteger(0);
- // Serialize each work unit into a file named after the task ID
+ /** @return `Path` beneath `basePath` to serialize `workUnit`, with file
named after the task ID (itself named after the job ID) */
public Path calcNextPath(WorkUnit workUnit, String jobId, Path basePath) {
String workUnitFileName = workUnit.isMultiWorkUnit()
? JobLauncherUtils.newMultiTaskId(jobId,
nextMultiWorkUnitTaskId.getAndIncrement()) +
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
: workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) +
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
return new Path(basePath, workUnitFileName);
}
+
+ /**
+ * Calc where to serialize {@link WorkUnit}, using a filename that tunnels
{@link WorkUnitSizeInfo}, vs. repeating the task/job ID, as was legacy practice
+ * @return `Path` beneath `basePath` to serialize `workUnit`
+ */
+ public Path calcNextPathWithTunneledSizeInfo(WorkUnit workUnit, String
jobId, Path basePath) {
+ String encodedSizeInfo = WorkUnitSizeInfo.forWorkUnit(workUnit).encode();
+ String workUnitFileName = workUnit.isMultiWorkUnit()
+ ? Id.MultiTask.create(encodedSizeInfo,
nextMultiWorkUnitTaskId.getAndIncrement()) +
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
+ : Id.Task.create(encodedSizeInfo,
workUnit.getPropAsInt(ConfigurationKeys.TASK_KEY_KEY)) +
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
+ return new Path(basePath, workUnitFileName);
+ }
Review Comment:
This is a great improvement to use encoded size info in place of job id , it
is being used everywhere.
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.tdunning.math.stats.TDigest;
+
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/** Bare-bones size information about a {@link WorkUnit}, possibly a {@link
MultiWorkUnit} */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WorkUnitSizeInfo {
+ // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
+ @NonNull private int numConstituents;
+ @NonNull private long totalSize;
+ @NonNull private double medianSize;
+ @NonNull private double meanSize;
+ @NonNull private double stddevSize;
+
+ /** @return the 'zero' {@link WorkUnitSizeInfo} */
+ public static WorkUnitSizeInfo empty() {
+ return new WorkUnitSizeInfo(0, 0, 0.0, 0.0, 0.0);
+ }
+
+ /**
+ * convenience factory to measure a {@link WorkUnit} - preferable to direct
ctor call
+ * @returns {@link #empty()} when the `WorkUnit` is not measurable by
defining {@link ServiceConfigKeys#WORK_UNIT_SIZE}
+ */
+ public static WorkUnitSizeInfo forWorkUnit(WorkUnit workUnit) {
+ if (!workUnit.isMultiWorkUnit()) {
+ long wuSize = workUnit.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE,
0);
+ return new WorkUnitSizeInfo(1, wuSize, wuSize, wuSize, 0.0);
Review Comment:
Does this `WORK_UNIT_SIZE` is populated in all types of source ?
I checked the code and see only CopySource is populating these value, so is
my understanding correct in other types of source this value will be 0 only ?
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.tdunning.math.stats.TDigest;
+
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/** Bare-bones size information about a {@link WorkUnit}, possibly a {@link
MultiWorkUnit} */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WorkUnitSizeInfo {
+ // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
+ @NonNull private int numConstituents;
+ @NonNull private long totalSize;
Review Comment:
`a constituent work unit is one with no children - a leaf` maybe a comment
similar to done above inWorkUnitsSizeSummary here as well describing what
numConstituents is referring
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]