gemini-code-assist[bot] commented on code in PR #37265:
URL: https://github.com/apache/beam/pull/37265#discussion_r2674710831
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java:
##########
@@ -398,6 +399,37 @@ public ResourceId apply(@Nonnull Metadata input) {
.delete(resourceIdsToDelete);
}
+ /**
+ * Report source {@link Lineage} metrics for multiple resource ids. Due to
the size limit of Beam
+ * metrics, report full file name or only dir depend on the number of files.
+ *
+ * <p>- Number of files<=100, report full file paths;
+ *
+ * <p>- Number of directory<=100, report directory names (one level up);
+ *
+ * <p>- Otherwise, report top level only.
+ */
+ public static void reportSourceLineage(List<ResourceId> resourceIds) {
+ if (resourceIds.size() <= 100) {
+ for (ResourceId resourceId : resourceIds) {
+ FileSystems.reportSourceLineage(resourceId);
+ }
+ } else {
+ HashSet<ResourceId> uniqueDirs = new HashSet<>();
+ for (ResourceId resourceId : resourceIds) {
+ ResourceId dir = resourceId.getCurrentDirectory();
+ uniqueDirs.add(dir);
+ if (uniqueDirs.size() > 100) {
+ FileSystems.reportSourceLineage(dir, LineageLevel.TOP_LEVEL);
+ return;
+ }
+ }
+ for (ResourceId uniqueDir : uniqueDirs) {
+ FileSystems.reportSourceLineage(uniqueDir);
+ }
+ }
+ }
Review Comment:

The magic number `100` is used multiple times for thresholding. It would be
better to define it as a constant to improve readability and maintainability. A
class-level constant would be ideal, but a local constant within the method is
also an improvement.
```java
public static void reportSourceLineage(List<ResourceId> resourceIds) {
final int MAX_LINEAGE_TARGETS = 100;
if (resourceIds.size() <= MAX_LINEAGE_TARGETS) {
for (ResourceId resourceId : resourceIds) {
FileSystems.reportSourceLineage(resourceId);
}
} else {
HashSet<ResourceId> uniqueDirs = new HashSet<>();
for (ResourceId resourceId : resourceIds) {
ResourceId dir = resourceId.getCurrentDirectory();
uniqueDirs.add(dir);
if (uniqueDirs.size() > MAX_LINEAGE_TARGETS) {
FileSystems.reportSourceLineage(dir, LineageLevel.TOP_LEVEL);
return;
}
}
for (ResourceId uniqueDir : uniqueDirs) {
FileSystems.reportSourceLineage(uniqueDir);
}
}
}
```
##########
sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java:
##########
@@ -744,6 +752,23 @@ public List<BoundedSource<KV<K, V>>> split(long
desiredBundleSizeBytes, Pipeline
.collect(Collectors.toList());
}
+ /** Report only file-based sources */
+ private void reportSourceLineage(final List<SerializableSplit>
inputSplits) {
+ List<ResourceId> fileResources = new ArrayList<>();
+
+ for (SerializableSplit split : inputSplits) {
+ InputSplit inputSplit = split.getSplit();
+
+ if (inputSplit instanceof FileSplit) {
+ String pathString = ((FileSplit) inputSplit).getPath().toString();
+ ResourceId resourceId = FileSystems.matchNewResource(pathString,
false);
+ fileResources.add(resourceId);
+ }
+ }
+
+ FileSystems.reportSourceLineage(fileResources);
Review Comment:

This loop can be refactored to use Java Streams for a more concise and
declarative style.
```java
List<ResourceId> fileResources =
inputSplits.stream()
.map(SerializableSplit::getSplit)
.filter(FileSplit.class::isInstance)
.map(FileSplit.class::cast)
.map(fileSplit ->
FileSystems.matchNewResource(fileSplit.getPath().toString(), false))
.collect(Collectors.toList());
FileSystems.reportSourceLineage(fileResources);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]