This is an automated email from the ASF dual-hosted git repository.

chamikaramj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 782953e3422 Add Delta Lake source to the Java Managed API (#38902)
782953e3422 is described below

commit 782953e342243e04f0c6b6d05b926c3ab1ef49a8
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Thu Jun 11 09:56:42 2026 -0700

    Add Delta Lake source to the Java Managed API (#38902)
---
 .../model/pipeline/v1/external_transforms.proto    |   2 +
 sdks/java/io/delta/build.gradle                    |   1 +
 .../io/delta/DeltaReadSchemaTransformProvider.java | 142 +++++++++++++++++++++
 .../org/apache/beam/sdk/io/delta/DeltaIOTest.java  |  41 ++++++
 .../java/org/apache/beam/sdk/managed/Managed.java  |   4 +
 5 files changed, 190 insertions(+)

diff --git 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index 043a72dd34f..c73986eed48 100644
--- 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++ 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -88,6 +88,8 @@ message ManagedTransforms {
       "beam:schematransform:org.apache.beam:sql_server_read:v1"];
     SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
       "beam:schematransform:org.apache.beam:sql_server_write:v1"];
+    DELTA_LAKE_READ = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+      "beam:schematransform:org.apache.beam:delta_lake_read:v1"];
   }
 }
 
diff --git a/sdks/java/io/delta/build.gradle b/sdks/java/io/delta/build.gradle
index c07aef6981b..57b1cd8ad87 100644
--- a/sdks/java/io/delta/build.gradle
+++ b/sdks/java/io/delta/build.gradle
@@ -33,6 +33,7 @@ def parquet_version = "1.16.0"
 
 dependencies {
     implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation project(path: ":model:pipeline", configuration: "shadow")
     implementation library.java.delta_kernel_api
     implementation library.java.delta_kernel_defaults
 
diff --git 
a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java
 
b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java
new file mode 100644
index 00000000000..42ca3f24def
--- /dev/null
+++ 
b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.delta;
+
+import static 
org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
+import 
org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * SchemaTransform implementation for {@link DeltaIO#readRows}. Reads records 
from Delta Lake and
+ * outputs a {@link org.apache.beam.sdk.values.PCollection} of Beam {@link
+ * org.apache.beam.sdk.values.Row}s.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class DeltaReadSchemaTransformProvider extends 
TypedSchemaTransformProvider<Configuration> {
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected SchemaTransform from(Configuration configuration) {
+    return new DeltaReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  @Override
+  public String identifier() {
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ);
+  }
+
+  static class DeltaReadSchemaTransform extends SchemaTransform {
+    private final Configuration configuration;
+
+    DeltaReadSchemaTransform(Configuration configuration) {
+      this.configuration =
+          java.util.Objects.requireNonNull(configuration, "configuration 
cannot be null");
+    }
+
+    Row getConfigurationRow() {
+      try {
+        return SchemaRegistry.createDefault()
+            .getToRowFunction(Configuration.class)
+            .apply(configuration)
+            .sorted()
+            .toSnakeCase();
+      } catch (NoSuchSchemaException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      DeltaIO.ReadRows read = 
DeltaIO.readRows().from(configuration.getTable());
+      if (configuration.getVersion() != null) {
+        read = read.withVersion(configuration.getVersion());
+      }
+      if (configuration.getTimestamp() != null) {
+        read = read.withTimestamp(configuration.getTimestamp());
+      }
+      Map<String, String> hadoopConfig = configuration.getHadoopConfig();
+      if (hadoopConfig != null) {
+        read = read.withConfig(hadoopConfig);
+      }
+
+      PCollection<Row> output = input.getPipeline().apply(read);
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, output);
+    }
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Configuration {
+    static Builder builder() {
+      return new 
AutoValue_DeltaReadSchemaTransformProvider_Configuration.Builder();
+    }
+
+    @SchemaFieldDescription("Identifier of the Delta Lake table.")
+    abstract String getTable();
+
+    @SchemaFieldDescription("Version of the Delta Lake table to read.")
+    @Nullable
+    abstract Long getVersion();
+
+    @SchemaFieldDescription("Timestamp of the Delta Lake table to read.")
+    @Nullable
+    abstract String getTimestamp();
+
+    @SchemaFieldDescription("Properties passed to the Hadoop Configuration.")
+    @Nullable
+    abstract Map<String, String> getHadoopConfig();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setTable(String table);
+
+      abstract Builder setVersion(@Nullable Long version);
+
+      abstract Builder setTimestamp(@Nullable String timestamp);
+
+      abstract Builder setHadoopConfig(@Nullable Map<String, String> 
hadoopConfig);
+
+      abstract Configuration build();
+    }
+  }
+}
diff --git 
a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java
 
b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java
index ef6dd660c60..bd8bf8b3c8c 100644
--- 
a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java
+++ 
b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.Compression;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.delta.DeltaIO.ReadRows;
 import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.managed.Managed;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -52,6 +53,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -387,6 +389,45 @@ public class DeltaIOTest {
     return java.nio.file.Files.readAllBytes(file.toPath());
   }
 
+  @Test
+  public void testManagedDeltaRead() throws Exception {
+    File tableDir = tempFolder.newFolder("managed-delta-table");
+
+    // 1. Write a Parquet file to simulate a Delta table
+    Schema schema = Schema.builder().addField("name", 
Schema.FieldType.STRING).build();
+    Row row = Row.withSchema(schema).addValues("test-name").build();
+    writeParquetFile(new File(tableDir, "part-00000.parquet"), row);
+
+    // 2. Create the Delta log
+    File logDir = new File(tableDir, "_delta_log");
+    logDir.mkdirs();
+    File commitFile = new File(logDir, "00000000000000000000.json");
+
+    File parquetFile = new File(tableDir, "part-00000.parquet");
+    byte[] fileBytes = Files.readAllBytes(parquetFile.toPath());
+
+    String commitContent =
+        "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n"
+            + 
"{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[],\"configuration\":{},\"createdAt\":123456789}}\n"
+            + 
"{\"add\":{\"path\":\"part-00000.parquet\",\"partitionValues\":{},\"size\":"
+            + fileBytes.length
+            + ",\"modificationTime\":123456789,\"dataChange\":true}}";
+
+    Files.write(commitFile.toPath(), 
commitContent.getBytes(StandardCharsets.UTF_8));
+
+    // 3. Read it using Managed
+    PCollection<Row> output =
+        readPipeline
+            .apply(
+                Managed.read(Managed.DELTA_LAKE)
+                    .withConfig(ImmutableMap.of("table", 
tableDir.getAbsolutePath())))
+            .getSinglePCollection();
+
+    PAssert.that(output).containsInAnyOrder(row);
+
+    readPipeline.run().waitUntilFinish();
+  }
+
   @Test
   @org.junit.Ignore("Manual integration test with external local table")
   public void testReadingLocalTable() throws Exception {
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index a5e7d879b44..9589992e079 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -93,6 +93,7 @@ public class Managed {
 
   // TODO: Dynamically generate a list of supported transforms
   public static final String ICEBERG = "iceberg";
+  public static final String DELTA_LAKE = "delta";
   public static final String ICEBERG_CDC = "iceberg_cdc";
   public static final String KAFKA = "kafka";
   public static final String BIGQUERY = "bigquery";
@@ -104,6 +105,7 @@ public class Managed {
   public static final Map<String, String> READ_TRANSFORMS =
       ImmutableMap.<String, String>builder()
           .put(ICEBERG, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ))
+          .put(DELTA_LAKE, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ))
           .put(ICEBERG_CDC, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_CDC_READ))
           .put(KAFKA, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
           .put(BIGQUERY, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
@@ -128,6 +130,8 @@ public class Managed {
    * <ul>
    *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg tables using <a
    *       
href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/iceberg/IcebergIO.html";>IcebergIO</a>
+   *   <li>{@link Managed#DELTA_LAKE} : Read from Delta Lake tables using <a
+   *       
href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/delta/DeltaIO.html";>DeltaIO</a>
    *   <li>{@link Managed#ICEBERG_CDC} : CDC Read from Apache Iceberg tables 
using <a
    *       
href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/iceberg/IcebergIO.html";>IcebergIO</a>
    *   <li>{@link Managed#KAFKA} : Read from Apache Kafka topics using <a

Reply via email to