shnapz commented on code in PR #37265:
URL: https://github.com/apache/beam/pull/37265#discussion_r2677689623
##########
sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java:
##########
@@ -744,6 +752,23 @@ public List<BoundedSource<KV<K, V>>> split(long
desiredBundleSizeBytes, Pipeline
.collect(Collectors.toList());
}
+ /** Report only file-based sources */
+ private void reportSourceLineage(final List<SerializableSplit>
inputSplits) {
+ List<ResourceId> fileResources = new ArrayList<>();
+
+ for (SerializableSplit split : inputSplits) {
+ InputSplit inputSplit = split.getSplit();
+
+ if (inputSplit instanceof FileSplit) {
+ String pathString = ((FileSplit) inputSplit).getPath().toString();
+ ResourceId resourceId = FileSystems.matchNewResource(pathString,
false);
+ fileResources.add(resourceId);
+ }
+ }
+
+ FileSystems.reportSourceLineage(fileResources);
Review Comment:
yes, done
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java:
##########
@@ -398,6 +399,37 @@ public ResourceId apply(@Nonnull Metadata input) {
.delete(resourceIdsToDelete);
}
+ /**
+ * Report source {@link Lineage} metrics for multiple resource ids. Due to
the size limit of Beam
+ * metrics, report full file name or only dir depend on the number of files.
+ *
+ * <p>- Number of files<=100, report full file paths;
+ *
+ * <p>- Number of directory<=100, report directory names (one level up);
+ *
+ * <p>- Otherwise, report top level only.
+ */
+ public static void reportSourceLineage(List<ResourceId> resourceIds) {
+ if (resourceIds.size() <= 100) {
+ for (ResourceId resourceId : resourceIds) {
+ FileSystems.reportSourceLineage(resourceId);
+ }
+ } else {
+ HashSet<ResourceId> uniqueDirs = new HashSet<>();
+ for (ResourceId resourceId : resourceIds) {
+ ResourceId dir = resourceId.getCurrentDirectory();
+ uniqueDirs.add(dir);
+ if (uniqueDirs.size() > 100) {
+ FileSystems.reportSourceLineage(dir, LineageLevel.TOP_LEVEL);
+ return;
+ }
+ }
+ for (ResourceId uniqueDir : uniqueDirs) {
+ FileSystems.reportSourceLineage(uniqueDir);
+ }
+ }
+ }
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]