This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 590c9c27797 Report source lineage from HadoopFormatIO (#37265)
590c9c27797 is described below
commit 590c9c2779755d94eccae21fad501274ac5e5439
Author: Andrew Kabas <[email protected]>
AuthorDate: Fri May 22 14:04:00 2026 -0400
Report source lineage from HadoopFormatIO (#37265)
* Report source lineage from HadoopFormatIO
* Extract shared code into FileSystems public API
* A little refactoring
* Fix import in HadoopFormatIO
* check style
* Implement lineage in HDFS and add tests
* address review comments
* Address review comments
---
.../apache/beam/sdk/io/hdfs/HadoopFileSystem.java | 17 +++++++++++++++
.../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 20 +++++++++++++++++
.../beam/sdk/io/hadoop/format/HadoopFormatIO.java | 25 ++++++++++++++++++++++
3 files changed, 62 insertions(+)
diff --git
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 4de1d539e76..00a77aa8447 100644
---
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.metrics.Lineage;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
@@ -336,6 +337,22 @@ class HadoopFileSystem extends
FileSystem<HadoopResourceId> {
return scheme;
}
+ @Override
+ protected void reportLineage(HadoopResourceId resourceId, Lineage lineage,
LineageLevel level) {
+ URI uri = resourceId.toPath().toUri();
+ ImmutableList.Builder<String> segments = ImmutableList.builder();
+ if (uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
+ segments.add(uri.getAuthority());
+ }
+ if (level != LineageLevel.TOP_LEVEL
+ && uri.getPath() != null
+ && !uri.getPath().isEmpty()
+ && !uri.getPath().equals("/")) {
+ segments.add(uri.getPath());
+ }
+ lineage.add(scheme, segments.build(), "/");
+ }
+
/** An adapter around {@link FSDataInputStream} that implements {@link
SeekableByteChannel}. */
private static class HadoopSeekableByteChannel implements
SeekableByteChannel {
private final FileStatus fileStatus;
diff --git
a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index c5918a8c9aa..e0b299401c7 100644
---
a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -24,6 +24,9 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.io.FileNotFoundException;
import java.io.InputStream;
@@ -37,12 +40,14 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -481,6 +486,21 @@ public class HadoopFileSystemTest {
p.run();
}
+ @Test
+ public void testReportLineage() {
+ verifyLineage(
+ "hdfs://namenode/path/to/file.txt", ImmutableList.of("namenode",
"/path/to/file.txt"));
+ verifyLineage("hdfs://namenode/", ImmutableList.of("namenode"));
+ verifyLineage("hdfs://namenode", ImmutableList.of("namenode"));
+ }
+
+ private void verifyLineage(String uri, List<String> expected) {
+ HadoopResourceId resourceId = new HadoopResourceId(URI.create(uri));
+ Lineage mockLineage = mock(Lineage.class);
+ fileSystem.reportLineage(resourceId, mockLineage,
FileSystem.LineageLevel.FILE);
+ verify(mockLineage, times(1)).add("hdfs", expected, "/");
+ }
+
private void create(String relativePath, byte[] contents) throws Exception {
try (WritableByteChannel channel =
fileSystem.create(
diff --git
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
index 0412e4286bb..501040928e4 100644
---
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
+++
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
@@ -34,6 +34,7 @@ import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -54,6 +55,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.hadoop.WritableCoder;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
@@ -97,6 +99,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
@@ -725,6 +728,7 @@ public class HadoopFormatIO {
return ImmutableList.of(this);
}
computeSplitsIfNecessary();
+ reportSourceLineage(inputSplits);
LOG.info(
"Generated {} splits. Size of first split is {} ",
inputSplits.size(),
@@ -744,6 +748,27 @@ public class HadoopFormatIO {
.collect(Collectors.toList());
}
+ private void reportSourceLineage(final List<SerializableSplit>
inputSplits) {
+ for (SerializableSplit serializableSplit : inputSplits) {
+ InputSplit split = serializableSplit.getSplit();
+ if (split instanceof FileSplit) {
+ URI uri = ((FileSplit) split).getPath().toUri();
+ String scheme = uri.getScheme();
+ if (scheme == null) {
+ continue;
+ }
+ ImmutableList.Builder<String> segments = ImmutableList.builder();
+ if (uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
+ segments.add(uri.getAuthority());
+ }
+ if (uri.getPath() != null && !uri.getPath().isEmpty() &&
!uri.getPath().equals("/")) {
+ segments.add(uri.getPath());
+ }
+ Lineage.getSources().add(scheme, segments.build(), "/");
+ }
+ }
+ }
+
@Override
public long getEstimatedSizeBytes(PipelineOptions po) throws Exception {
if (inputSplit == null) {