This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 54673996c9b [Java] ManagedIO  (#30808)
54673996c9b is described below

commit 54673996c9bf2ee076b04833bbae2729d6cebbaf
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Mon Apr 8 06:55:17 2024 -0400

    [Java] ManagedIO  (#30808)
    
    * managed api for java
    
    * yaml utils
---
 build.gradle.kts                                   |   1 +
 sdks/java/core/build.gradle                        |   1 +
 .../apache/beam/sdk/schemas/utils/YamlUtils.java   | 171 ++++++++++++++++
 .../org/apache/beam/sdk/util/YamlUtilsTest.java    | 228 +++++++++++++++++++++
 sdks/java/managed/build.gradle                     |  37 ++++
 .../java/org/apache/beam/sdk/managed/Managed.java  | 195 ++++++++++++++++++
 .../managed/ManagedSchemaTransformProvider.java    | 183 +++++++++++++++++
 .../org/apache/beam/sdk/managed/package-info.java  |  20 ++
 .../ManagedSchemaTransformProviderTest.java        | 103 ++++++++++
 .../org/apache/beam/sdk/managed/ManagedTest.java   | 114 +++++++++++
 .../sdk/managed/TestSchemaTransformProvider.java   |  98 +++++++++
 .../managed/src/test/resources/test_config.yaml    |  21 ++
 settings.gradle.kts                                |   2 +
 13 files changed, 1174 insertions(+)

diff --git a/build.gradle.kts b/build.gradle.kts
index ded692677b5..9c42ffdc8ce 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -303,6 +303,7 @@ tasks.register("javaPreCommit") {
   dependsOn(":sdks:java:io:synthetic:build")
   dependsOn(":sdks:java:io:xml:build")
   dependsOn(":sdks:java:javadoc:allJavadoc")
+  dependsOn(":sdks:java:managed:build")
   dependsOn(":sdks:java:testing:expansion-service:build")
   dependsOn(":sdks:java:testing:jpms-tests:build")
   dependsOn(":sdks:java:testing:load-tests:build")
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index 438a3fb1806..5a47cb5237e 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -98,6 +98,7 @@ dependencies {
   permitUnusedDeclared 
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
   provided library.java.json_org
   implementation library.java.everit_json_schema
+  implementation "org.yaml:snakeyaml:2.0"
   shadowTest library.java.everit_json_schema
   provided library.java.junit
   testImplementation "com.github.stefanbirkner:system-rules:1.19.0"
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
new file mode 100644
index 00000000000..5c05b2bed39
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static org.apache.beam.sdk.values.Row.toRow;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.yaml.snakeyaml.Yaml;
+
+public class YamlUtils {
+  private static final Map<Schema.TypeName, Function<String, @Nullable 
Object>> YAML_VALUE_PARSERS =
+      ImmutableMap
+          .<Schema.TypeName,
+              Function<String, 
@org.checkerframework.checker.nullness.qual.Nullable Object>>
+              builder()
+          .put(Schema.TypeName.BYTE, Byte::valueOf)
+          .put(Schema.TypeName.INT16, Short::valueOf)
+          .put(Schema.TypeName.INT32, Integer::valueOf)
+          .put(Schema.TypeName.INT64, Long::valueOf)
+          .put(Schema.TypeName.FLOAT, Float::valueOf)
+          .put(Schema.TypeName.DOUBLE, Double::valueOf)
+          .put(Schema.TypeName.DECIMAL, BigDecimal::new)
+          .put(Schema.TypeName.BOOLEAN, Boolean::valueOf)
+          .put(Schema.TypeName.STRING, str -> str)
+          .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str))
+          .build();
+
+  public static Row toBeamRow(@Nullable String yamlString, Schema schema) {
+    return toBeamRow(yamlString, schema, false);
+  }
+
+  public static Row toBeamRow(
+      @Nullable String yamlString, Schema schema, boolean 
convertNamesToCamelCase) {
+    if (yamlString == null || yamlString.isEmpty()) {
+      List<Field> requiredFields =
+          schema.getFields().stream()
+              .filter(field -> !field.getType().getNullable())
+              .collect(Collectors.toList());
+      if (requiredFields.isEmpty()) {
+        return Row.nullRow(schema);
+      } else {
+        throw new IllegalArgumentException(
+            String.format(
+                "Received an empty YAML string, but output schema contains 
required fields: %s",
+                requiredFields));
+      }
+    }
+    Yaml yaml = new Yaml();
+    Object yamlMap = yaml.load(yamlString);
+
+    Preconditions.checkArgument(
+        yamlMap instanceof Map,
+        "Expected a YAML mapping but got type '%s' instead.",
+        Preconditions.checkNotNull(yamlMap).getClass());
+
+    return toBeamRow(
+        (Map<String, Object>) Preconditions.checkNotNull(yamlMap), schema, 
convertNamesToCamelCase);
+  }
+
+  private static @Nullable Object toBeamValue(
+      Field field, @Nullable Object yamlValue, boolean 
convertNamesToCamelCase) {
+    FieldType fieldType = field.getType();
+
+    if (yamlValue == null) {
+      if (fieldType.getNullable()) {
+        return null;
+      } else {
+        throw new IllegalArgumentException(
+            "Received null value for non-nullable field \"" + field.getName() 
+ "\"");
+      }
+    }
+
+    if (yamlValue instanceof String
+        || yamlValue instanceof Number
+        || yamlValue instanceof Boolean) {
+      String yamlStringValue = yamlValue.toString();
+      if (YAML_VALUE_PARSERS.containsKey(fieldType.getTypeName())) {
+        return 
YAML_VALUE_PARSERS.get(fieldType.getTypeName()).apply(yamlStringValue);
+      }
+    }
+
+    if (yamlValue instanceof byte[] && fieldType.getTypeName() == 
Schema.TypeName.BYTES) {
+      return yamlValue;
+    }
+
+    if (yamlValue instanceof List) {
+      FieldType innerType =
+          Preconditions.checkNotNull(
+              fieldType.getCollectionElementType(),
+              "Cannot convert YAML type '%s` to `%s` because the YAML value is 
a List, but the output schema field does not define a collection type.",
+              yamlValue.getClass(),
+              fieldType);
+      return ((List<Object>) yamlValue)
+          .stream()
+              .map(
+                  v ->
+                      Preconditions.checkNotNull(
+                          toBeamValue(field.withType(innerType), v, 
convertNamesToCamelCase)))
+              .collect(Collectors.toList());
+    }
+
+    if (yamlValue instanceof Map) {
+      if (fieldType.getTypeName() == Schema.TypeName.ROW) {
+        Schema nestedSchema =
+            Preconditions.checkNotNull(
+                fieldType.getRowSchema(),
+                "Received a YAML '%s' type, but output schema field '%s' does 
not define a Row Schema",
+                yamlValue.getClass(),
+                fieldType);
+        return toBeamRow((Map<String, Object>) yamlValue, nestedSchema, 
convertNamesToCamelCase);
+      } else if (fieldType.getTypeName() == Schema.TypeName.MAP) {
+        return yamlValue;
+      }
+    }
+
+    throw new UnsupportedOperationException(
+        String.format(
+            "Converting YAML type '%s' to '%s' is not supported", 
yamlValue.getClass(), fieldType));
+  }
+
+  @SuppressWarnings("nullness")
+  public static Row toBeamRow(Map<String, Object> yamlMap, Schema rowSchema, 
boolean toCamelCase) {
+    return rowSchema.getFields().stream()
+        .map(
+            field ->
+                toBeamValue(
+                    field,
+                    yamlMap.get(maybeGetSnakeCase(field.getName(), 
toCamelCase)),
+                    toCamelCase))
+        .collect(toRow(rowSchema));
+  }
+
+  private static String maybeGetSnakeCase(String str, boolean getSnakeCase) {
+    return getSnakeCase ? 
CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, str) : str;
+  }
+
+  public static String yamlStringFromMap(@Nullable Map<String, Object> map) {
+    if (map == null || map.isEmpty()) {
+      return "";
+    }
+    return new Yaml().dumpAsMap(map);
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java
new file mode 100644
index 00000000000..6e6984dde3a
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class YamlUtilsTest {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  public String makeNested(String input) {
+    return Arrays.stream(input.split("\n"))
+        .map(str -> "  " + str)
+        .collect(Collectors.joining("\n"));
+  }
+
+  @Test
+  public void testEmptyYamlString() {
+    Schema schema = Schema.builder().build();
+
+    assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow("", schema));
+  }
+
+  @Test
+  public void testInvalidEmptyYamlWithNonEmptySchema() {
+    Schema schema = Schema.builder().addStringField("dummy").build();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Received an empty YAML string, but output schema contains required 
fields");
+    thrown.expectMessage("dummy");
+
+    YamlUtils.toBeamRow("", schema);
+  }
+
+  @Test
+  public void testNullableValues() {
+    String yamlString = "nullable_string:\n" + "nullable_integer:\n" + 
"nullable_boolean:\n";
+    Schema schema =
+        Schema.builder()
+            .addNullableStringField("nullable_string")
+            .addNullableInt32Field("nullable_integer")
+            .addNullableBooleanField("nullable_boolean")
+            .build();
+
+    assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema));
+  }
+
+  @Test
+  public void testMissingNullableValues() {
+    String yamlString = "nullable_string:";
+    Schema schema =
+        Schema.builder()
+            .addNullableStringField("nullable_string")
+            .addNullableInt32Field("nullable_integer")
+            .addNullableBooleanField("nullable_boolean")
+            .build();
+
+    assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema));
+  }
+
+  @Test
+  public void testInvalidNullableValues() {
+    String yamlString = "nullable_string:\n" + "integer:";
+    Schema schema =
+        
Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Received null value for non-nullable field 
\"integer\"");
+    YamlUtils.toBeamRow(yamlString, schema);
+  }
+
+  @Test
+  public void testInvalidMissingRequiredValues() {
+    String yamlString = "nullable_string:";
+    Schema schema =
+        
Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Received null value for non-nullable field 
\"integer\"");
+
+    YamlUtils.toBeamRow(yamlString, schema);
+  }
+
+  @Test
+  public void testExtraFieldsAreIgnored() {
+    String yamlString = "field1: val1\n" + "field2: val2";
+    Schema schema = Schema.builder().addStringField("field1").build();
+    Row expectedRow = Row.withSchema(schema).withFieldValue("field1", 
"val1").build();
+
+    assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema));
+  }
+
+  @Test
+  public void testInvalidTopLevelArray() {
+    String invalidYaml = "- top_level_list" + "- another_list";
+    Schema schema = Schema.builder().build();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Expected a YAML mapping");
+    YamlUtils.toBeamRow(invalidYaml, schema);
+  }
+
+  private static final Schema FLAT_SCHEMA =
+      Schema.builder()
+          .addByteField("byte_field")
+          .addInt16Field("int16_field")
+          .addInt32Field("int32_field")
+          .addInt64Field("int64_field")
+          .addFloatField("float_field")
+          .addDoubleField("double_field")
+          .addDecimalField("decimal_field")
+          .addBooleanField("boolean_field")
+          .addStringField("string_field")
+          .addByteArrayField("bytes_field")
+          .build();
+
+  private static final Row FLAT_ROW =
+      Row.withSchema(FLAT_SCHEMA)
+          .withFieldValue("byte_field", Byte.valueOf("123"))
+          .withFieldValue("int16_field", Short.valueOf("16"))
+          .withFieldValue("int32_field", 32)
+          .withFieldValue("int64_field", 64L)
+          .withFieldValue("float_field", 123.456F)
+          .withFieldValue("double_field", 456.789)
+          .withFieldValue("decimal_field", BigDecimal.valueOf(789.123))
+          .withFieldValue("boolean_field", true)
+          .withFieldValue("string_field", "some string")
+          .withFieldValue("bytes_field", BaseEncoding.base64().decode("abc"))
+          .build();
+
+  private static final String FLAT_YAML =
+      "byte_field: 123\n"
+          + "int16_field: 16\n"
+          + "int32_field: 32\n"
+          + "int64_field: 64\n"
+          + "float_field: 123.456\n"
+          + "double_field: 456.789\n"
+          + "decimal_field: 789.123\n"
+          + "boolean_field: true\n"
+          + "string_field: some string\n"
+          + "bytes_field: abc";
+
+  @Test
+  public void testAllTypesFlat() {
+    assertEquals(FLAT_ROW, YamlUtils.toBeamRow(FLAT_YAML, FLAT_SCHEMA));
+  }
+
+  @Test
+  public void testAllTypesNested() {
+    String nestedFlatTypes = makeNested(FLAT_YAML);
+    String topLevelYaml = "top_string: abc\n" + "nested: \n" + nestedFlatTypes;
+
+    Schema schema =
+        Schema.builder().addStringField("top_string").addRowField("nested", 
FLAT_SCHEMA).build();
+    Row expectedRow =
+        Row.withSchema(schema)
+            .withFieldValue("top_string", "abc")
+            .withFieldValue("nested", FLAT_ROW)
+            .build();
+
+    assertEquals(expectedRow, YamlUtils.toBeamRow(topLevelYaml, schema));
+  }
+
+  private static final String INT_ARRAY_YAML =
+      "arr:\n" + "  - 1\n" + "  - 2\n" + "  - 3\n" + "  - 4\n" + "  - 5\n";
+
+  private static final Schema INT_ARRAY_SCHEMA =
+      Schema.builder().addArrayField("arr", Schema.FieldType.INT32).build();
+
+  private static final Row INT_ARRAY_ROW =
+      Row.withSchema(INT_ARRAY_SCHEMA)
+          .withFieldValue("arr", IntStream.range(1, 
6).boxed().collect(Collectors.toList()))
+          .build();
+
+  @Test
+  public void testArray() {
+    assertEquals(INT_ARRAY_ROW, YamlUtils.toBeamRow(INT_ARRAY_YAML, 
INT_ARRAY_SCHEMA));
+  }
+
+  @Test
+  public void testNestedArray() {
+    String nestedArray = makeNested(INT_ARRAY_YAML);
+    String yamlString = "str_field: some string\n" + "nested: \n" + 
nestedArray;
+
+    Schema schema =
+        Schema.builder()
+            .addStringField("str_field")
+            .addRowField("nested", INT_ARRAY_SCHEMA)
+            .build();
+
+    Row expectedRow =
+        Row.withSchema(schema)
+            .withFieldValue("str_field", "some string")
+            .withFieldValue("nested", INT_ARRAY_ROW)
+            .build();
+
+    assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema));
+  }
+}
diff --git a/sdks/java/managed/build.gradle b/sdks/java/managed/build.gradle
new file mode 100644
index 00000000000..88e537d66f8
--- /dev/null
+++ b/sdks/java/managed/build.gradle
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'org.apache.beam.module'
+applyJavaNature(
+        automaticModuleName: 'org.apache.beam.sdk.managed',
+)
+
+
+description = "Apache Beam :: SDKs :: Java :: Managed"
+ext.summary = """Library that provides managed IOs."""
+
+
+dependencies {
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation library.java.vendored_guava_32_1_2_jre
+//    implementation library.java.vendored_grpc_1_60_1
+
+    testImplementation library.java.junit
+    testRuntimeOnly "org.yaml:snakeyaml:2.0"
+    testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
+}
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
new file mode 100644
index 00000000000..b2b010b1e43
--- /dev/null
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and 
instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link 
Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which 
means each one has a
+ * defined configuration. A given transform can be built with a {@code 
Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = PCollectionRowTuple.empty(pipeline).apply(
+ *       Managed.read(ICEBERG)
+ *           .withConfig(ImmutableMap.<String, Map>.builder()
+ *               .put("foo", "abc")
+ *               .put("bar", 123)
+ *               .build()));
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one 
can provide the
+ * location to a YAML file that contains this information. Say we have the 
following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple input = PCollectionRowTuple.of("input", 
pipeline.apply(Create.of(...)))
+ *
+ * PCollectionRowTuple output = input.apply(
+ *     Managed.write(ICEBERG)
+ *         .withConfigUrl(<config path>));
+ * }</pre>
+ */
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  public static final Map<String, String> READ_TRANSFORMS =
+      ImmutableMap.<String, String>builder()
+          .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+          .build();
+  public static final Map<String, String> WRITE_TRANSFORMS =
+      ImmutableMap.<String, String>builder()
+          .put(ICEBERG, 
"beam:schematransform:org.apache.beam:iceberg_write:v1")
+          .build();
+
+  /**
+   * Instantiates a {@link Managed.Read} transform for the specified source. 
The supported managed
+   * sources are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   * </ul>
+   */
+  public static ManagedTransform read(String source) {
+
+    return new AutoValue_Managed_ManagedTransform.Builder()
+        .setIdentifier(
+            Preconditions.checkNotNull(
+                READ_TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one 
of the following sources: %s",
+                source,
+                READ_TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(READ_TRANSFORMS.values()))
+        .build();
+  }
+
+  /**
+   * Instantiates a {@link Managed.Write} transform for the specified sink. 
The supported managed
+   * sinks are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
+   * </ul>
+   */
+  public static ManagedTransform write(String sink) {
+    return new AutoValue_Managed_ManagedTransform.Builder()
+        .setIdentifier(
+            Preconditions.checkNotNull(
+                WRITE_TRANSFORMS.get(sink.toLowerCase()),
+                "An unsupported sink was specified: '%s'. Please specify one 
of the following sinks: %s",
+                sink,
+                WRITE_TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(WRITE_TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class ManagedTransform extends SchemaTransform {
+    abstract String getIdentifier();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    @VisibleForTesting
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setIdentifier(String identifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      @VisibleForTesting
+      abstract Builder setSupportedIdentifiers(List<String> 
supportedIdentifiers);
+
+      abstract ManagedTransform build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate 
the underlying
+     * transform. The map can ignore nullable parameters, but needs to include 
all required
+     * parameters. Check the underlying transform's schema ({@link
+     * SchemaTransformProvider#configurationSchema()}) to see which parameters 
are available.
+     */
+    public ManagedTransform withConfig(Map<String, Object> config) {
+      return 
toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration 
arguments from a
+     * specified YAML file location.
+     */
+    public ManagedTransform withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    ManagedTransform withSupportedIdentifiers(List<String> 
supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setTransformIdentifier(getIdentifier())
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform =
+          new 
ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+}
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
new file mode 100644
index 00000000000..1ee2b11a90f
--- /dev/null
+++ 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+@AutoService(SchemaTransformProvider.class)
+public class ManagedSchemaTransformProvider
+    extends 
TypedSchemaTransformProvider<ManagedSchemaTransformProvider.ManagedConfig> {
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:managed:v1";
+  }
+
+  private final Map<String, SchemaTransformProvider> schemaTransformProviders 
= new HashMap<>();
+
+  public ManagedSchemaTransformProvider() {}
+
+  ManagedSchemaTransformProvider(Collection<String> supportedIdentifiers) {
+    try {
+      for (SchemaTransformProvider schemaTransformProvider :
+          ServiceLoader.load(SchemaTransformProvider.class)) {
+        if 
(schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) {
+          throw new IllegalArgumentException(
+              "Found multiple SchemaTransformProvider implementations with the 
same identifier "
+                  + schemaTransformProvider.identifier());
+        }
+        schemaTransformProviders.put(schemaTransformProvider.identifier(), 
schemaTransformProvider);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e.getMessage());
+    }
+
+    schemaTransformProviders.entrySet().removeIf(e -> 
!supportedIdentifiers.contains(e.getKey()));
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  @VisibleForTesting
+  abstract static class ManagedConfig {
+    public static Builder builder() {
+      return new 
AutoValue_ManagedSchemaTransformProvider_ManagedConfig.Builder();
+    }
+
+    @SchemaFieldDescription("Identifier of the underlying IO to instantiate.")
+    public abstract String getTransformIdentifier();
+
+    @SchemaFieldDescription("URL path to the YAML config file used to build 
the underlying IO.")
+    public abstract @Nullable String getConfigUrl();
+
+    @SchemaFieldDescription("YAML string config used to build the underlying 
IO.")
+    public abstract @Nullable String getConfig();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setTransformIdentifier(String identifier);
+
+      public abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      public abstract Builder setConfig(@Nullable String config);
+
+      public abstract ManagedConfig build();
+    }
+
+    protected void validate() {
+      boolean configExists = !Strings.isNullOrEmpty(getConfig());
+      boolean configUrlExists = !Strings.isNullOrEmpty(getConfigUrl());
+      checkArgument(
+          !(configExists && configUrlExists) && (configExists || 
configUrlExists),
+          "Please specify a config or a config URL, but not both.");
+    }
+  }
+
+  @Override
+  protected SchemaTransform from(ManagedConfig managedConfig) {
+    managedConfig.validate();
+    SchemaTransformProvider schemaTransformProvider =
+        Preconditions.checkNotNull(
+            
schemaTransformProviders.get(managedConfig.getTransformIdentifier()),
+            "Could not find transform with identifier %s, or it may not be 
supported",
+            managedConfig.getTransformIdentifier());
+
+    // parse config before expansion to check if it matches underlying 
transform's config schema
+    Schema transformConfigSchema = 
schemaTransformProvider.configurationSchema();
+    Row transformConfig;
+    try {
+      transformConfig = getRowConfig(managedConfig, transformConfigSchema);
+    } catch (Exception e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Specified configuration does not align with the underlying 
transform's configuration schema [%s].",
+              transformConfigSchema),
+          e);
+    }
+
+    return new ManagedSchemaTransform(transformConfig, 
schemaTransformProvider);
+  }
+
+  private static class ManagedSchemaTransform extends SchemaTransform {
+    private final Row transformConfig;
+    private final SchemaTransformProvider underlyingTransformProvider;
+
+    ManagedSchemaTransform(
+        Row transformConfig, SchemaTransformProvider 
underlyingTransformProvider) {
+      this.transformConfig = transformConfig;
+      this.underlyingTransformProvider = underlyingTransformProvider;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      SchemaTransform underlyingTransform = 
underlyingTransformProvider.from(transformConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  @VisibleForTesting
+  static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
+    String transformYamlConfig;
+    if (!Strings.isNullOrEmpty(config.getConfigUrl())) {
+      try {
+        MatchResult.Metadata fileMetaData =
+            
FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(config.getConfigUrl()));
+        ByteBuffer buffer = ByteBuffer.allocate((int) 
fileMetaData.sizeBytes());
+        FileSystems.open(fileMetaData.resourceId()).read(buffer);
+        transformYamlConfig = new String(buffer.array(), 
StandardCharsets.UTF_8);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      transformYamlConfig = config.getConfig();
+    }
+
+    return YamlUtils.toBeamRow(transformYamlConfig, transformSchema, true);
+  }
+
+  @VisibleForTesting
+  Map<String, SchemaTransformProvider> getAllProviders() {
+    return schemaTransformProviders;
+  }
+}
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java
new file mode 100644
index 00000000000..d129e4a7a22
--- /dev/null
+++ 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Managed reads and writes. */
+package org.apache.beam.sdk.managed;
diff --git 
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
 
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..0c495d0d2c5
--- /dev/null
+++ 
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import static 
org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedConfig;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ManagedSchemaTransformProviderTest {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testFailWhenNoConfigSpecified() {
+    ManagedSchemaTransformProvider.ManagedConfig config =
+        ManagedSchemaTransformProvider.ManagedConfig.builder()
+            .setTransformIdentifier("some identifier")
+            .build();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Please specify a config or a config URL, but not 
both");
+    config.validate();
+  }
+
+  @Test
+  public void testGetRowFromYamlConfig() {
+    String yamlString = "extra_string: abc\n" + "extra_integer: 123";
+    ManagedConfig config =
+        ManagedConfig.builder()
+            .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
+            .setConfig(yamlString)
+            .build();
+    Schema configSchema = new 
TestSchemaTransformProvider().configurationSchema();
+    Row expectedRow =
+        Row.withSchema(configSchema)
+            .withFieldValue("extraString", "abc")
+            .withFieldValue("extraInteger", 123)
+            .build();
+    Row configRow =
+        ManagedSchemaTransformProvider.getRowConfig(
+            config, new TestSchemaTransformProvider().configurationSchema());
+
+    assertEquals(expectedRow, configRow);
+  }
+
+  @Test
+  public void testGetRowFromConfigUrl() throws URISyntaxException {
+    String yamlConfigPath =
+        
Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI())
+            .toFile()
+            .getAbsolutePath();
+    ManagedConfig config =
+        ManagedConfig.builder()
+            .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
+            .setConfigUrl(yamlConfigPath)
+            .build();
+    Schema configSchema = new 
TestSchemaTransformProvider().configurationSchema();
+    Row expectedRow =
+        Row.withSchema(configSchema)
+            .withFieldValue("extraString", "abc")
+            .withFieldValue("extraInteger", 123)
+            .build();
+    Row configRow =
+        ManagedSchemaTransformProvider.getRowConfig(
+            config, new TestSchemaTransformProvider().configurationSchema());
+
+    assertEquals(expectedRow, configRow);
+  }
+
+  @Test
+  public void testDiscoverTestProvider() {
+    ManagedSchemaTransformProvider provider =
+        new 
ManagedSchemaTransformProvider(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER));
+
+    
assertTrue(provider.getAllProviders().containsKey(TestSchemaTransformProvider.IDENTIFIER));
+  }
+}
diff --git 
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java 
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
new file mode 100644
index 00000000000..ceb71a06f33
--- /dev/null
+++ 
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ManagedTest {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testInvalidTransform() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("An unsupported source was specified");
+    Managed.read("nonexistent-source");
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("An unsupported sink was specified");
+    Managed.write("nonexistent-sink");
+  }
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  private static final Schema SCHEMA =
+      Schema.builder().addStringField("str").addInt32Field("int").build();
+  private static final List<Row> ROWS =
+      Arrays.asList(
+          Row.withSchema(SCHEMA).withFieldValue("str", 
"a").withFieldValue("int", 1).build(),
+          Row.withSchema(SCHEMA).withFieldValue("str", 
"b").withFieldValue("int", 2).build(),
+          Row.withSchema(SCHEMA).withFieldValue("str", 
"c").withFieldValue("int", 3).build());
+
+  public void runTestProviderTest(Managed.ManagedTransform writeOp) {
+    PCollection<Row> rows =
+        PCollectionRowTuple.of("input", 
pipeline.apply(Create.of(ROWS)).setRowSchema(SCHEMA))
+            .apply(writeOp)
+            .get("output");
+
+    Schema outputSchema = rows.getSchema();
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            ROWS.stream()
+                .map(
+                    row ->
+                        Row.withSchema(outputSchema)
+                            .addValues(row.getValues())
+                            .addValue("abc")
+                            .addValue(123)
+                            .build())
+                .collect(Collectors.toList()));
+    pipeline.run();
+  }
+
+  @Test
+  public void testManagedTestProviderWithConfigMap() {
+    Managed.ManagedTransform writeOp =
+        Managed.write(Managed.ICEBERG)
+            .toBuilder()
+            .setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
+            .build()
+            
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
+            .withConfig(ImmutableMap.of("extra_string", "abc", 
"extra_integer", 123));
+
+    runTestProviderTest(writeOp);
+  }
+
+  @Test
+  public void testManagedTestProviderWithConfigFile() throws Exception {
+    String yamlConfigPath =
+        
Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI())
+            .toFile()
+            .getAbsolutePath();
+
+    Managed.ManagedTransform writeOp =
+        Managed.write(Managed.ICEBERG)
+            .toBuilder()
+            .setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
+            .build()
+            
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
+            .withConfigUrl(yamlConfigPath);
+
+    runTestProviderTest(writeOp);
+  }
+}
diff --git 
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java
 
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java
new file mode 100644
index 00000000000..136d98d468d
--- /dev/null
+++ 
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@AutoService(SchemaTransformProvider.class)
+public class TestSchemaTransformProvider
+    extends TypedSchemaTransformProvider<TestSchemaTransformProvider.Config> {
+  static final String IDENTIFIER = 
"beam:schematransform:org.apache.beam:test_transform:v1";
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Config {
+    public static Builder builder() {
+      return new AutoValue_TestSchemaTransformProvider_Config.Builder();
+    }
+
+    @SchemaFieldDescription("String to add to each row element.")
+    public abstract String getExtraString();
+
+    @SchemaFieldDescription("Integer to add to each row element.")
+    public abstract Integer getExtraInteger();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setExtraString(String extraString);
+
+      public abstract Builder setExtraInteger(Integer extraInteger);
+
+      public abstract Config build();
+    }
+  }
+
+  @Override
+  public SchemaTransform from(Config config) {
+    String extraString = config.getExtraString();
+    Integer extraInteger = config.getExtraInteger();
+    return new SchemaTransform() {
+      @Override
+      public PCollectionRowTuple expand(PCollectionRowTuple input) {
+        Schema schema =
+            Schema.builder()
+                .addFields(input.get("input").getSchema().getFields())
+                .addStringField("extra_string")
+                .addInt32Field("extra_integer")
+                .build();
+        PCollection<Row> rows =
+            input
+                .get("input")
+                .apply(
+                    MapElements.into(TypeDescriptors.rows())
+                        .via(
+                            row ->
+                                Row.withSchema(schema)
+                                    .addValues(row.getValues())
+                                    .addValue(extraString)
+                                    .addValue(extraInteger)
+                                    .build()))
+                .setRowSchema(schema);
+        return PCollectionRowTuple.of("output", rows);
+      }
+    };
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+}
diff --git a/sdks/java/managed/src/test/resources/test_config.yaml 
b/sdks/java/managed/src/test/resources/test_config.yaml
new file mode 100644
index 00000000000..7725c32b348
--- /dev/null
+++ b/sdks/java/managed/src/test/resources/test_config.yaml
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+extra_string: "abc"
+extra_integer: 123
\ No newline at end of file
diff --git a/settings.gradle.kts b/settings.gradle.kts
index ec11fd32fdd..1e52e425b21 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -353,3 +353,5 @@ include("sdks:java:io:kafka:kafka-100")
 findProject(":sdks:java:io:kafka:kafka-100")?.name = "kafka-100"
 include("sdks:java:io:kafka:kafka-01103")
 findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103"
+include("sdks:java:managed")
+findProject(":sdks:java:managed")?.name = "managed"

Reply via email to