This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 590c9c27797 Report source lineage from HadoopFormatIO (#37265)
590c9c27797 is described below

commit 590c9c2779755d94eccae21fad501274ac5e5439
Author: Andrew Kabas <[email protected]>
AuthorDate: Fri May 22 14:04:00 2026 -0400

    Report source lineage from HadoopFormatIO (#37265)
    
    * Report source lineage from HadoopFormatIO
    
    * Extract shared code into FileSystems public API
    
    * A little refactoring
    
    * Fix import in HadoopFormatIO
    
    * check style
    
    * Implement lineage in HDFS and add tests
    
    * address review comments
    
    * Address review comments
---
 .../apache/beam/sdk/io/hdfs/HadoopFileSystem.java  | 17 +++++++++++++++
 .../beam/sdk/io/hdfs/HadoopFileSystemTest.java     | 20 +++++++++++++++++
 .../beam/sdk/io/hadoop/format/HadoopFormatIO.java  | 25 ++++++++++++++++++++++
 3 files changed, 62 insertions(+)

diff --git 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 4de1d539e76..00a77aa8447 100644
--- 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.metrics.Lineage;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
@@ -336,6 +337,22 @@ class HadoopFileSystem extends 
FileSystem<HadoopResourceId> {
     return scheme;
   }
 
+  @Override
+  protected void reportLineage(HadoopResourceId resourceId, Lineage lineage, 
LineageLevel level) {
+    URI uri = resourceId.toPath().toUri();
+    ImmutableList.Builder<String> segments = ImmutableList.builder();
+    if (uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
+      segments.add(uri.getAuthority());
+    }
+    if (level != LineageLevel.TOP_LEVEL
+        && uri.getPath() != null
+        && !uri.getPath().isEmpty()
+        && !uri.getPath().equals("/")) {
+      segments.add(uri.getPath());
+    }
+    lineage.add(scheme, segments.build(), "/");
+  }
+
   /** An adapter around {@link FSDataInputStream} that implements {@link 
SeekableByteChannel}. */
   private static class HadoopSeekableByteChannel implements 
SeekableByteChannel {
     private final FileStatus fileStatus;
diff --git 
a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
 
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index c5918a8c9aa..e0b299401c7 100644
--- 
a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ 
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -24,6 +24,9 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.io.FileNotFoundException;
 import java.io.InputStream;
@@ -37,12 +40,14 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -481,6 +486,21 @@ public class HadoopFileSystemTest {
     p.run();
   }
 
+  @Test
+  public void testReportLineage() {
+    verifyLineage(
+        "hdfs://namenode/path/to/file.txt", ImmutableList.of("namenode", 
"/path/to/file.txt"));
+    verifyLineage("hdfs://namenode/", ImmutableList.of("namenode"));
+    verifyLineage("hdfs://namenode", ImmutableList.of("namenode"));
+  }
+
+  private void verifyLineage(String uri, List<String> expected) {
+    HadoopResourceId resourceId = new HadoopResourceId(URI.create(uri));
+    Lineage mockLineage = mock(Lineage.class);
+    fileSystem.reportLineage(resourceId, mockLineage, 
FileSystem.LineageLevel.FILE);
+    verify(mockLineage, times(1)).add("hdfs", expected, "/");
+  }
+
   private void create(String relativePath, byte[] contents) throws Exception {
     try (WritableByteChannel channel =
         fileSystem.create(
diff --git 
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
index 0412e4286bb..501040928e4 100644
--- 
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
+++ 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
@@ -34,6 +34,7 @@ import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -54,6 +55,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.io.hadoop.WritableCoder;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
@@ -97,6 +99,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
@@ -725,6 +728,7 @@ public class HadoopFormatIO {
         return ImmutableList.of(this);
       }
       computeSplitsIfNecessary();
+      reportSourceLineage(inputSplits);
       LOG.info(
           "Generated {} splits. Size of first split is {} ",
           inputSplits.size(),
@@ -744,6 +748,27 @@ public class HadoopFormatIO {
           .collect(Collectors.toList());
     }
 
+    private void reportSourceLineage(final List<SerializableSplit> 
inputSplits) {
+      for (SerializableSplit serializableSplit : inputSplits) {
+        InputSplit split = serializableSplit.getSplit();
+        if (split instanceof FileSplit) {
+          URI uri = ((FileSplit) split).getPath().toUri();
+          String scheme = uri.getScheme();
+          if (scheme == null) {
+            continue;
+          }
+          ImmutableList.Builder<String> segments = ImmutableList.builder();
+          if (uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
+            segments.add(uri.getAuthority());
+          }
+          if (uri.getPath() != null && !uri.getPath().isEmpty() && 
!uri.getPath().equals("/")) {
+            segments.add(uri.getPath());
+          }
+          Lineage.getSources().add(scheme, segments.build(), "/");
+        }
+      }
+    }
+
     @Override
     public long getEstimatedSizeBytes(PipelineOptions po) throws Exception {
       if (inputSplit == null) {

Reply via email to