This is an automated email from the ASF dual-hosted git repository.

derrickaw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 20ac8c4f35a add datadog io normalization and yaml (#38362)
20ac8c4f35a is described below

commit 20ac8c4f35a7662570ec098952474b3be8c503cc
Author: Derrick Williams <[email protected]>
AuthorDate: Wed Jun 10 15:51:32 2026 -0400

    add datadog io normalization and yaml (#38362)
    
    * draft files
    
    * currently works
    
    * initial test file
    
    * add more test cases
    
    * add doc strings and slight code improvements
    
    * adding more test cases and working out errorHandling
    
    * works with one test failure
    
    * updated error handling logic and add more tests
    
    * fix conflict with standard_io
    
    * all tests pass - need to add write verification through Datadog 
agent/store
    
    * first draft of yaml test file
    
    * some minor changes to schematransformer and write error schema; also 
adding initil yaml test file
    
    * working snapshot before simplification
    
    * combine errors
    
    * import constants
    
    * rollback DatadogIO cleanups
    
    * rollback DatadogWriteError cleanups
    
    * rollback SYNCHRONIZED_PROCESSING_TIME in timeutil.py
    
    * simplify expansion-service datadog dependency to runtimeOnly
    
    * revert some previous cleanup operations
    
    * restore proper trailing newline to build.gradle
    
    * fix lint issues
    
    * run standard external transform script
    
    * fix gemini review comments
    
    * update logic for better performance
    
    * fix yaml row failure
    
    * Trigger fresh CI/CD run
    
    * update coder
    
    * old design parts
    
    * revert datadogio coder change and try to cover in integration_tests
    
    * fix coder
    
    * fix lint
    
    * address more geminic comments
    
    * fix error output
    
    * fix another gemini review
    
    * remove some dead code
    
    * fix spotless
    
    * revert venv
    
    * address comments
    
    * address agent to fake server comment
    
    * format
    
    * address Cham's comments
---
 .../apache/beam/sdk/io/datadog/DatadogEvent.java   |   6 +
 .../DatadogWriteSchemaTransformConfiguration.java  | 114 +++++
 .../DatadogWriteSchemaTransformProvider.java       | 294 +++++++++++
 .../DatadogWriteSchemaTransformProviderTest.java   | 535 +++++++++++++++++++++
 sdks/java/io/expansion-service/build.gradle        |   1 +
 sdks/python/apache_beam/yaml/integration_tests.py  |  34 ++
 sdks/python/apache_beam/yaml/standard_io.yaml      |  21 +
 .../python/apache_beam/yaml/test_utils/__init__.py |  18 +
 .../yaml/test_utils/datadog_test_utils.py          | 131 +++++
 sdks/python/apache_beam/yaml/tests/datadog.yaml    |  63 +++
 sdks/standard_external_transforms.yaml             |  35 ++
 11 files changed, 1252 insertions(+)

diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java
index 80334b5e466..e9a2546d9d9 100644
--- 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java
@@ -26,6 +26,12 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 @AutoValue
 public abstract class DatadogEvent {
 
+  public static final String SOURCE = "ddsource";
+  public static final String TAGS = "ddtags";
+  public static final String HOSTNAME = "hostname";
+  public static final String SERVICE = "service";
+  public static final String MESSAGE = "message";
+
   public static Builder newBuilder() {
     return new AutoValue_DatadogEvent.Builder();
   }
diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformConfiguration.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformConfiguration.java
new file mode 100644
index 00000000000..059b3b33ed4
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformConfiguration.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+/**
+ * Configuration for writing to Datadog.
+ *
+ * <p>This class is meant to be used with {@link 
DatadogWriteSchemaTransformProvider}.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class DatadogWriteSchemaTransformConfiguration {
+
+  public void validate() {
+    String invalidConfigMessage = "Invalid Datadog Write configuration: ";
+    checkArgument(!getUrl().isEmpty(), invalidConfigMessage + "url must be 
specified.");
+    checkArgument(!getApiKey().isEmpty(), invalidConfigMessage + "apiKey must 
be specified.");
+    Integer batchCount = getBatchCount();
+    if (batchCount != null) {
+      checkArgument(batchCount > 0, invalidConfigMessage + "batchCount must be 
greater than 0.");
+    }
+    Integer minBatchCount = getMinBatchCount();
+    if (minBatchCount != null) {
+      checkArgument(
+          minBatchCount > 0, invalidConfigMessage + "minBatchCount must be 
greater than 0.");
+    }
+    Long maxBufferSize = getMaxBufferSize();
+    if (maxBufferSize != null) {
+      checkArgument(
+          maxBufferSize > 0, invalidConfigMessage + "maxBufferSize must be 
greater than 0.");
+    }
+    Integer parallelism = getParallelism();
+    if (parallelism != null) {
+      checkArgument(parallelism > 0, invalidConfigMessage + "parallelism must 
be greater than 0.");
+    }
+    ErrorHandling errorHandling = getErrorHandling();
+    if (errorHandling != null) {
+      checkArgument(
+          !Strings.isNullOrEmpty(errorHandling.getOutput()),
+          invalidConfigMessage + "Output must not be empty if error handling 
specified.");
+    }
+  }
+
+  /** Instantiates a {@link DatadogWriteSchemaTransformConfiguration.Builder} 
instance. */
+  public static DatadogWriteSchemaTransformConfiguration.Builder builder() {
+    return new AutoValue_DatadogWriteSchemaTransformConfiguration.Builder();
+  }
+
+  @SchemaFieldDescription("The Datadog API URL.")
+  public abstract String getUrl();
+
+  @SchemaFieldDescription("The Datadog API key.")
+  public abstract String getApiKey();
+
+  @SchemaFieldDescription("The minimum number of events to batch together for 
each write.")
+  public abstract @Nullable Integer getMinBatchCount();
+
+  @SchemaFieldDescription("The number of events to batch together for each 
write.")
+  public abstract @Nullable Integer getBatchCount();
+
+  @SchemaFieldDescription("The maximum buffer size in bytes.")
+  public abstract @Nullable Long getMaxBufferSize();
+
+  @SchemaFieldDescription("The degree of parallelism for writing.")
+  public abstract @Nullable Integer getParallelism();
+
+  @SchemaFieldDescription("Specifies how to handle errors.")
+  public abstract @Nullable ErrorHandling getErrorHandling();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setUrl(String url);
+
+    public abstract Builder setApiKey(String apiKey);
+
+    public abstract Builder setMinBatchCount(Integer minBatchCount);
+
+    public abstract Builder setBatchCount(Integer batchCount);
+
+    public abstract Builder setMaxBufferSize(Long maxBufferSize);
+
+    public abstract Builder setParallelism(Integer parallelism);
+
+    public abstract Builder setErrorHandling(@Nullable ErrorHandling 
errorHandling);
+
+    /** Builds the {@link DatadogWriteSchemaTransformConfiguration} 
configuration. */
+    public abstract DatadogWriteSchemaTransformConfiguration build();
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformProvider.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformProvider.java
new file mode 100644
index 00000000000..d0013ac0422
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformProvider.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+@AutoService(SchemaTransformProvider.class)
+public class DatadogWriteSchemaTransformProvider
+    extends 
TypedSchemaTransformProvider<DatadogWriteSchemaTransformConfiguration> {
+  private static final String IDENTIFIER = 
"beam:schematransform:org.apache.beam:datadog_write:v1";
+  static final String INPUT = "input";
+  static final String OUTPUT = "output";
+  static final String ERROR = "errors";
+  public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Void> OUTPUT_TAG = new TupleTag<Void>() {};
+  public static final TupleTag<DatadogEvent> EVENT_TAG = new 
TupleTag<DatadogEvent>() {};
+
+  @Override
+  protected Class<DatadogWriteSchemaTransformConfiguration> 
configurationClass() {
+    return DatadogWriteSchemaTransformConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(DatadogWriteSchemaTransformConfiguration 
configuration) {
+    return new DatadogWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} input 
collection names method. */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} output 
collection names method. */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(ERROR);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for Datadog Write jobs 
configured using {@link
+   * DatadogWriteSchemaTransformConfiguration}.
+   */
+  static class DatadogWriteSchemaTransform extends SchemaTransform {
+    private final DatadogWriteSchemaTransformConfiguration configuration;
+
+    DatadogWriteSchemaTransform(DatadogWriteSchemaTransformConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      // Validate configuration parameters
+      configuration.validate();
+
+      // Obtain input rows
+      PCollection<Row> inputRows = input.get(INPUT);
+
+      // Check for errors
+      boolean handleErrors = 
ErrorHandling.hasOutput(configuration.getErrorHandling());
+
+      Schema inputSchema = inputRows.getSchema();
+      Schema dynamicErrorSchema =
+          Schema.builder()
+              .addNullableRowField("failed_row", inputSchema)
+              .addNullableField("payload", Schema.FieldType.STRING)
+              .addNullableField("statusCode", Schema.FieldType.INT32)
+              .addNullableField("statusMessage", Schema.FieldType.STRING)
+              .build();
+
+      PCollectionTuple convertResult =
+          inputRows.apply(
+              "Convert to DatadogEvent",
+              ParDo.of(new RowToEventFn(handleErrors, ERROR_TAG, 
dynamicErrorSchema))
+                  .withOutputTags(EVENT_TAG, TupleTagList.of(ERROR_TAG)));
+
+      PCollection<DatadogEvent> datadogEvents =
+          convertResult.get(EVENT_TAG).setCoder(DatadogEventCoder.of());
+      PCollection<Row> conversionErrors =
+          convertResult
+              .get(ERROR_TAG)
+              
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+      // Configure DatadogIO.Write
+      DatadogIO.Write.Builder builder =
+          DatadogIO.writeBuilder(configuration.getMinBatchCount())
+              .withUrl(configuration.getUrl())
+              .withApiKey(configuration.getApiKey());
+
+      Integer batchCount = configuration.getBatchCount();
+      if (batchCount != null) {
+        builder = builder.withBatchCount(batchCount);
+      }
+      Long maxBufferSize = configuration.getMaxBufferSize();
+      if (maxBufferSize != null) {
+        builder = builder.withMaxBufferSize(maxBufferSize);
+      }
+      Integer parallelism = configuration.getParallelism();
+      if (parallelism != null) {
+        builder = builder.withParallelism(parallelism);
+      }
+
+      DatadogIO.Write write = builder.build();
+
+      // Apply DatadogIO.Write
+      PCollection<DatadogWriteError> writeErrors = datadogEvents.apply("Write 
To Datadog", write);
+
+      // Handle errors
+      ErrorHandling errorHandling = configuration.getErrorHandling();
+      if (errorHandling != null) {
+        PCollection<Row> writeErrorRows =
+            writeErrors
+                .apply(
+                    "Convert Write Errors to Rows",
+                    org.apache.beam.sdk.transforms.MapElements.into(
+                            org.apache.beam.sdk.values.TypeDescriptors.rows())
+                        .via(
+                            error ->
+                                Row.withSchema(dynamicErrorSchema)
+                                    .addValue(null)
+                                    .addValue(error.payload())
+                                    .addValue(error.statusCode())
+                                    .addValue(error.statusMessage())
+                                    .build()))
+                
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+        PCollection<Row> allErrors =
+            org.apache.beam.sdk.values.PCollectionList.of(conversionErrors)
+                .and(writeErrorRows)
+                .apply("Flatten Errors", 
org.apache.beam.sdk.transforms.Flatten.pCollections())
+                
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+        return PCollectionRowTuple.of(errorHandling.getOutput(), allErrors);
+      } else {
+        writeErrors.apply("Fail on Write Error", ParDo.of(new 
FailOnWriteErrorFn()));
+        PCollection<Row> emptyErrors =
+            input
+                .getPipeline()
+                .apply("Empty Errors Placeholder", 
Create.empty(RowCoder.of(dynamicErrorSchema)));
+        return PCollectionRowTuple.of(ERROR, emptyErrors);
+      }
+    }
+  }
+
+  static final Schema WRITE_ERROR_SCHEMA =
+      Schema.builder()
+          .addNullableField("payload", Schema.FieldType.STRING)
+          .addNullableField("statusCode", Schema.FieldType.INT32)
+          .addNullableField("statusMessage", Schema.FieldType.STRING)
+          .build();
+
+  static final Schema DATADOG_EVENT_SCHEMA =
+      Schema.builder()
+          .addNullableField(DatadogEvent.SOURCE, Schema.FieldType.STRING)
+          .addNullableField(DatadogEvent.TAGS, Schema.FieldType.STRING)
+          .addNullableField(DatadogEvent.HOSTNAME, Schema.FieldType.STRING)
+          .addNullableField(DatadogEvent.SERVICE, Schema.FieldType.STRING)
+          .addNullableField(DatadogEvent.MESSAGE, Schema.FieldType.STRING)
+          .build();
+
+  static Row eventToRow(DatadogEvent event) {
+    return Row.withSchema(DATADOG_EVENT_SCHEMA)
+        .addValue(event.ddsource())
+        .addValue(event.ddtags())
+        .addValue(event.hostname())
+        .addValue(event.service())
+        .addValue(event.message())
+        .build();
+  }
+
+  static DatadogEvent rowToEvent(Row row) {
+    DatadogEvent.Builder builder = DatadogEvent.newBuilder();
+    Schema schema = row.getSchema();
+
+    String ddsource =
+        schema.hasField(DatadogEvent.SOURCE) ? 
row.getString(DatadogEvent.SOURCE) : null;
+    if (ddsource != null) {
+      builder.withSource(ddsource);
+    }
+    String ddtags = schema.hasField(DatadogEvent.TAGS) ? 
row.getString(DatadogEvent.TAGS) : null;
+    if (ddtags != null) {
+      builder.withTags(ddtags);
+    }
+    String hostname =
+        schema.hasField(DatadogEvent.HOSTNAME) ? 
row.getString(DatadogEvent.HOSTNAME) : null;
+    if (hostname != null) {
+      builder.withHostname(hostname);
+    }
+    String service =
+        schema.hasField(DatadogEvent.SERVICE) ? 
row.getString(DatadogEvent.SERVICE) : null;
+    if (service != null) {
+      builder.withService(service);
+    }
+    String message =
+        schema.hasField(DatadogEvent.MESSAGE) ? 
row.getString(DatadogEvent.MESSAGE) : null;
+    builder.withMessage(checkNotNull(message, "Message is required."));
+
+    return builder.build();
+  }
+
+  static class RowToEventFn extends DoFn<Row, DatadogEvent> {
+    private final boolean handleErrors;
+    private final TupleTag<Row> errorOutputTag;
+    private final Schema errorSchema;
+
+    RowToEventFn(boolean handleErrors, TupleTag<Row> errorOutputTag, Schema 
errorSchema) {
+      this.handleErrors = handleErrors;
+      this.errorOutputTag = errorOutputTag;
+      this.errorSchema = errorSchema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      try {
+        c.output(rowToEvent(c.element()));
+      } catch (Exception e) {
+        if (handleErrors) {
+          String rowString = c.element().toString();
+          String payload = rowString.length() <= 1024 ? rowString : 
rowString.substring(0, 1024);
+          c.output(
+              errorOutputTag,
+              Row.withSchema(errorSchema)
+                  .addValue(c.element())
+                  .addValue(payload)
+                  .addValue(java.net.HttpURLConnection.HTTP_BAD_REQUEST)
+                  .addValue(e.getMessage())
+                  .build());
+        } else {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * A {@link DoFn} that throws a {@link RuntimeException} when a write error 
is encountered,
+   * causing the pipeline to fail. This is the default error handling behavior 
when no error output
+   * is configured.
+   */
+  static class FailOnWriteErrorFn extends DoFn<DatadogWriteError, Void> {
+    @ProcessElement
+    public void processElement(@Element DatadogWriteError error) {
+      String message = error.statusMessage();
+      if (error.statusCode() != null) {
+        throw new RuntimeException(
+            String.format(
+                "Datadog write failed with status code %d: %s", 
error.statusCode(), message));
+      } else {
+        throw new RuntimeException("Datadog write failed: " + message);
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformProviderTest.java
 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..534251fb4c1
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformProviderTest.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static 
org.apache.beam.sdk.io.datadog.DatadogWriteSchemaTransformProvider.ERROR;
+import static 
org.apache.beam.sdk.io.datadog.DatadogWriteSchemaTransformProvider.INPUT;
+import static 
org.apache.beam.sdk.io.datadog.DatadogWriteSchemaTransformProvider.OUTPUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+public class DatadogWriteSchemaTransformProviderTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DatadogWriteSchemaTransformProviderTest.class);
+
+  @Rule public TestPipeline p = TestPipeline.create();
+
+  private static final Schema SCHEMA =
+      Schema.builder()
+          .addStringField("ddsource")
+          .addNullableField("ddtags", Schema.FieldType.STRING)
+          .addStringField("hostname")
+          .addNullableField("service", Schema.FieldType.STRING)
+          .addStringField("message")
+          .build();
+
+  private static final List<Row> ROWS =
+      Arrays.asList(
+          Row.withSchema(SCHEMA)
+              .withFieldValue("ddsource", "my-source")
+              .withFieldValue("ddtags", "tag1:value1,tag2")
+              .withFieldValue("hostname", "my-host")
+              .withFieldValue("service", "my-service")
+              .withFieldValue("message", "Hello World 1")
+              .build(),
+          Row.withSchema(SCHEMA)
+              .withFieldValue("ddsource", "my-source-2")
+              .withFieldValue("ddtags", null)
+              .withFieldValue("hostname", "my-host-2")
+              .withFieldValue("service", null)
+              .withFieldValue("message", "Hello World 2")
+              .build());
+
+  private List<DatadogEvent> events;
+
+  @org.junit.Before
+  public void setUp() {
+    events =
+        Arrays.asList(
+            DatadogEvent.newBuilder()
+                .withSource("my-source")
+                .withTags("tag1:value1,tag2")
+                .withHostname("my-host")
+                .withService("my-service")
+                .withMessage("Hello World 1")
+                .build(),
+            DatadogEvent.newBuilder()
+                .withSource("my-source-2")
+                .withHostname("my-host-2")
+                .withMessage("Hello World 2")
+                .build());
+  }
+
+  @Test
+  public void testWriteInvalidConfigurations() {
+
+    // apiKey not set
+    assertThrows(
+        IllegalStateException.class,
+        () -> {
+          DatadogWriteSchemaTransformConfiguration.builder()
+              .setUrl("http://localhost:8080";)
+              //   .setApiKey("test-api-key") # ApiKey is mandatory
+              .build()
+              .validate();
+        });
+
+    // url not set
+    assertThrows(
+        IllegalStateException.class,
+        () -> {
+          DatadogWriteSchemaTransformConfiguration.builder()
+              //   .setUrl("http://localhost:8080";) # Url is mandatory
+              .setApiKey("test-api-key")
+              .build()
+              .validate();
+        });
+  }
+
+  @Test
+  public void testWriteBuildTransform() {
+    DatadogWriteSchemaTransformProvider provider = new 
DatadogWriteSchemaTransformProvider();
+    DatadogWriteSchemaTransformConfiguration configuration =
+        DatadogWriteSchemaTransformConfiguration.builder()
+            .setApiKey("test-api-key")
+            .setUrl("http://localhost:8080";)
+            .build();
+
+    provider.from(configuration);
+  }
+
+  @Test
+  public void testWriteBuildTransformAndRun() {
+    DatadogWriteSchemaTransformProvider provider = new 
DatadogWriteSchemaTransformProvider();
+    DatadogWriteSchemaTransformConfiguration configuration =
+        DatadogWriteSchemaTransformConfiguration.builder()
+            .setApiKey("test-api-key")
+            .setUrl("http://localhost:8080";)
+            .build();
+
+    SchemaTransform transform = provider.from(configuration);
+
+    PCollection<Row> input = p.apply("Create", 
Create.of(ROWS).withRowSchema(SCHEMA));
+    PCollectionRowTuple inputTuple = PCollectionRowTuple.of(INPUT, input);
+    PCollectionRowTuple output = transform.expand(inputTuple);
+    assertEquals(1, output.getAll().size());
+    assertTrue(output.has(ERROR));
+
+    assertThrows(PipelineExecutionException.class, () -> 
p.run().waitUntilFinish());
+  }
+
+  @Test
+  public void testWriteBuildTransformWithCorrectFields() {
+    ServiceLoader<SchemaTransformProvider> serviceLoader =
+        ServiceLoader.load(SchemaTransformProvider.class);
+    List<SchemaTransformProvider> providers =
+        StreamSupport.stream(serviceLoader.spliterator(), false)
+            .filter(provider -> provider.getClass() == 
DatadogWriteSchemaTransformProvider.class)
+            .collect(Collectors.toList());
+    SchemaTransformProvider datadogProvider = providers.get(0);
+    assertEquals(datadogProvider.outputCollectionNames(), 
Lists.newArrayList(ERROR));
+
+    assertEquals(
+        Sets.newHashSet(
+            "url",
+            "api_key",
+            "min_batch_count",
+            "batch_count",
+            "max_buffer_size",
+            "parallelism",
+            "error_handling"),
+        datadogProvider.configurationSchema().getFields().stream()
+            .map(field -> field.getName())
+            .collect(Collectors.toSet()));
+  }
+
+  @Test
+  public void testRowToDatadogEvent() {
+    for (int i = 0; i < ROWS.size(); i++) {
+      DatadogEvent actual = 
DatadogWriteSchemaTransformProvider.rowToEvent(ROWS.get(i));
+      assertEquals(events.get(i), actual);
+    }
+  }
+
+  @Test
+  public void testRowToDatadogEventWithMissingOptionalFields() {
+    Schema missingFieldsSchema =
+        Schema.builder()
+            .addStringField("ddsource")
+            .addStringField("hostname")
+            .addStringField("message")
+            .build();
+
+    Row row =
+        Row.withSchema(missingFieldsSchema)
+            .withFieldValue("ddsource", "my-source")
+            .withFieldValue("hostname", "my-host")
+            .withFieldValue("message", "Hello World 1")
+            .build();
+
+    DatadogEvent expectedEvent =
+        DatadogEvent.newBuilder()
+            .withSource("my-source")
+            .withHostname("my-host")
+            .withMessage("Hello World 1")
+            .build();
+
+    DatadogEvent actual = DatadogWriteSchemaTransformProvider.rowToEvent(row);
+    assertEquals(expectedEvent, actual);
+  }
+
+  @Test
+  public void testRowToDatadogEventWithExtraFields_DiscardsExtraFields() {
+    Schema extraFieldsSchema =
+        Schema.builder()
+            .addStringField("ddsource")
+            .addNullableField("ddtags", Schema.FieldType.STRING)
+            .addStringField("hostname")
+            .addNullableField("service", Schema.FieldType.STRING)
+            .addStringField("message")
+            .addStringField("extra_field")
+            .build();
+
+    Row row =
+        Row.withSchema(extraFieldsSchema)
+            .withFieldValue("ddsource", "my-source")
+            .withFieldValue("ddtags", "tag1:value1,tag2")
+            .withFieldValue("hostname", "my-host")
+            .withFieldValue("service", "my-service")
+            .withFieldValue("message", "Hello World 1")
+            .withFieldValue("extra_field", "extra_value")
+            .build();
+
+    DatadogEvent expectedEvent =
+        DatadogEvent.newBuilder()
+            .withSource("my-source")
+            .withTags("tag1:value1,tag2")
+            .withHostname("my-host")
+            .withService("my-service")
+            .withMessage("Hello World 1")
+            .build();
+
+    DatadogEvent actual = DatadogWriteSchemaTransformProvider.rowToEvent(row);
+    assertEquals(expectedEvent, actual);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testRowToDatadogEventWithNullRequiredField() {
+    Schema nullSchema =
+        Schema.builder()
+            .addStringField("ddsource")
+            .addNullableField("ddtags", Schema.FieldType.STRING)
+            .addStringField("hostname")
+            .addNullableField("service", Schema.FieldType.STRING)
+            .addNullableField("message", Schema.FieldType.STRING)
+            .build();
+
+    Row row =
+        Row.withSchema(nullSchema)
+            .withFieldValue("ddsource", "my-source")
+            .withFieldValue("ddtags", "tag1:value1,tag2")
+            .withFieldValue("hostname", "my-host")
+            .withFieldValue("service", "my-service")
+            .withFieldValue("message", null)
+            .build();
+
+    DatadogWriteSchemaTransformProvider.rowToEvent(row);
+  }
+
+  @Test
+  public void testBuildTransformWithAllParameters() {
+    DatadogWriteSchemaTransformProvider provider = new 
DatadogWriteSchemaTransformProvider();
+    DatadogWriteSchemaTransformConfiguration configuration =
+        DatadogWriteSchemaTransformConfiguration.builder()
+            .setApiKey("test-api-key")
+            .setUrl("http://localhost:8080";)
+            .setBatchCount(10)
+            .setMaxBufferSize(100L)
+            .setParallelism(2)
+            .build();
+
+    SchemaTransform transform = provider.from(configuration);
+
+    PCollection<Row> input = p.apply("Create", 
Create.of(ROWS).withRowSchema(SCHEMA));
+    PCollectionRowTuple inputTuple = PCollectionRowTuple.of(INPUT, input);
+    PCollectionRowTuple output = transform.expand(inputTuple);
+    assertEquals(1, output.getAll().size());
+    assertTrue(output.has(ERROR));
+
+    assertThrows(PipelineExecutionException.class, () -> 
p.run().waitUntilFinish());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBuildTransformMissingUrl() {
+    DatadogWriteSchemaTransformProvider provider = new 
DatadogWriteSchemaTransformProvider();
+    provider.from(
+        
DatadogWriteSchemaTransformConfiguration.builder().setApiKey("test-api-key").build());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBuildTransformMissingApiKey() {
+    DatadogWriteSchemaTransformProvider provider = new 
DatadogWriteSchemaTransformProvider();
+    
provider.from(DatadogWriteSchemaTransformConfiguration.builder().setUrl("test-url").build());
+  }
+
+  @Test
+  public void testBuildTransformWithInvalidParallelism() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          DatadogWriteSchemaTransformConfiguration.builder()
+              .setApiKey("test-api-key")
+              .setUrl("http://localhost:8080";)
+              .setParallelism(0)
+              .build()
+              .validate();
+        });
+  }
+
+  @Test
+  public void testBuildTransformWithInvalidBatchCount() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          DatadogWriteSchemaTransformConfiguration.builder()
+              .setApiKey("test-api-key")
+              .setUrl("http://localhost:8080";)
+              .setBatchCount(0)
+              .setMinBatchCount(1)
+              .build()
+              .validate();
+        });
+  }
+
+  @Test
+  public void testBuildTransformFromRowConfiguration() throws 
NoSuchSchemaException {
+    DatadogWriteSchemaTransformProvider provider = new 
DatadogWriteSchemaTransformProvider();
+    Schema configSchema = provider.configurationSchema();
+    Schema errorHandlingSchema = 
configSchema.getField("error_handling").getType().getRowSchema();
+
+    Row errorHandlingRow =
+        Row.withSchema(errorHandlingSchema).withFieldValue(OUTPUT, 
ERROR).build();
+
+    Row configRow =
+        Row.withSchema(configSchema)
+            .withFieldValue("url", "http://localhost:8080";)
+            .withFieldValue("api_key", "test-api-key")
+            .withFieldValue("min_batch_count", null)
+            .withFieldValue("batch_count", 10)
+            .withFieldValue("max_buffer_size", 100L)
+            .withFieldValue("parallelism", 2)
+            .withFieldValue("error_handling", errorHandlingRow)
+            .build();
+
+    SchemaTransform transform = provider.from(configRow);
+
+    PCollection<Row> input = p.apply("Create", 
Create.of(ROWS).withRowSchema(SCHEMA));
+    PCollectionRowTuple inputTuple = PCollectionRowTuple.of(INPUT, input);
+    PCollectionRowTuple output = transform.expand(inputTuple);
+    assertEquals(1, output.getAll().size());
+    assertTrue(output.has(ERROR));
+
+    p.run().waitUntilFinish();
+  }
+
+  @Test(expected = ClassCastException.class)
+  public void testRowToDatadogEventWithWrongType() {
+    Schema wrongSchema =
+        Schema.builder()
+            .addStringField("ddsource")
+            .addInt64Field("ddtags")
+            .addStringField("hostname")
+            .addStringField("message")
+            .build();
+
+    Row row =
+        Row.withSchema(wrongSchema)
+            .withFieldValue("ddsource", "my-source")
+            .withFieldValue("ddtags", 123L)
+            .withFieldValue("hostname", "my-host")
+            .withFieldValue("message", "Hello World 1")
+            .build();
+
+    DatadogWriteSchemaTransformProvider.rowToEvent(row);
+  }
+
+  @Test
+  public void testErrorHandling() {
+    DatadogWriteSchemaTransformProvider provider = new 
DatadogWriteSchemaTransformProvider();
+    ErrorHandling errorHandling = 
ErrorHandling.builder().setOutput(ERROR).build();
+    DatadogWriteSchemaTransformConfiguration configuration =
+        DatadogWriteSchemaTransformConfiguration.builder()
+            .setApiKey("test-api-key")
+            .setUrl("http://localhost:8080";)
+            .setBatchCount(10)
+            .setMaxBufferSize(1L)
+            .setParallelism(1)
+            .setErrorHandling(errorHandling)
+            .build();
+
+    Schema nullSchema =
+        Schema.builder()
+            .addStringField("ddsource")
+            .addNullableField("ddtags", Schema.FieldType.STRING)
+            .addStringField("hostname")
+            .addNullableField("service", Schema.FieldType.STRING)
+            .addNullableField("message", Schema.FieldType.STRING)
+            .build();
+
+    Row row =
+        Row.withSchema(nullSchema)
+            .withFieldValue("ddsource", "my-source")
+            .withFieldValue("ddtags", "tag1:value1,tag2")
+            .withFieldValue("hostname", "my-host")
+            .withFieldValue("service", "my-service")
+            .withFieldValue("message", null)
+            .build();
+
+    PCollection<Row> input = p.apply(Create.of(row).withRowSchema(nullSchema));
+    PCollectionRowTuple inputTuple = PCollectionRowTuple.of(INPUT, input);
+
+    SchemaTransform transform = provider.from(configuration);
+    PCollectionRowTuple outputTuple = transform.expand(inputTuple);
+
+    assertTrue(outputTuple.has(ERROR));
+    PAssert.that(outputTuple.get(ERROR))
+        .satisfies(
+            (errors) -> {
+              assertEquals(1, errors.spliterator().getExactSizeIfKnown());
+              Row error = errors.iterator().next();
+              assertEquals(row.toString(), error.getString("payload"));
+              assertEquals(
+                  (Integer) java.net.HttpURLConnection.HTTP_BAD_REQUEST,
+                  error.getInt32("statusCode"));
+              assertTrue(
+                  "Expected status message to contain 'Message is required.'",
+                  error.getString("statusMessage").contains("Message is 
required."));
+              return null;
+            });
+
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testErrorHandlingWithMissingRequiredField() {
+    DatadogWriteSchemaTransformProvider provider = new 
DatadogWriteSchemaTransformProvider();
+    ErrorHandling errorHandling = 
ErrorHandling.builder().setOutput(ERROR).build();
+    DatadogWriteSchemaTransformConfiguration configuration =
+        DatadogWriteSchemaTransformConfiguration.builder()
+            .setApiKey("test-api-key")
+            .setUrl("http://localhost:8080";)
+            .setErrorHandling(errorHandling)
+            .build();
+
+    Schema missingFieldSchema =
+        
Schema.builder().addStringField("ddsource").addStringField("hostname").build();
+
+    Row row =
+        Row.withSchema(missingFieldSchema)
+            .withFieldValue("ddsource", "my-source")
+            .withFieldValue("hostname", "my-host")
+            .build();
+
+    PCollection<Row> input = 
p.apply(Create.of(row).withRowSchema(missingFieldSchema));
+    PCollectionRowTuple inputTuple = PCollectionRowTuple.of(INPUT, input);
+
+    SchemaTransform transform = provider.from(configuration);
+    PCollectionRowTuple outputTuple = transform.expand(inputTuple);
+
+    assertTrue(outputTuple.has(ERROR));
+    PAssert.that(outputTuple.get(ERROR))
+        .satisfies(
+            (errors) -> {
+              assertEquals(1, errors.spliterator().getExactSizeIfKnown());
+              Row error = errors.iterator().next();
+              assertEquals(row.toString(), error.getString("payload"));
+              assertEquals(
+                  (Integer) java.net.HttpURLConnection.HTTP_BAD_REQUEST,
+                  error.getInt32("statusCode"));
+              assertTrue(
+                  "Expected status message to contain 'Message is required.'",
+                  error.getString("statusMessage").contains("Message is 
required."));
+              return null;
+            });
+
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testConfigurationSchema() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    registry.registerSchemaProvider(ErrorHandling.class, new 
AutoValueSchema());
+    Schema schema = 
registry.getSchema(DatadogWriteSchemaTransformConfiguration.class);
+    Schema errorHandlingSchema = registry.getSchema(ErrorHandling.class);
+
+    LOG.info("Schema fields: {}", schema.getFieldNames());
+
+    assertEquals(7, schema.getFieldCount());
+    assertTrue(schema.hasField("url"));
+    assertTrue(schema.hasField("apiKey"));
+    assertTrue(schema.hasField("minBatchCount"));
+    assertTrue(schema.hasField("batchCount"));
+    assertTrue(schema.hasField("maxBufferSize"));
+    assertTrue(schema.hasField("parallelism"));
+    assertTrue(schema.hasField("errorHandling"));
+
+    LOG.info("URL field type: {}", 
schema.getField("url").getType().getTypeName());
+    assertEquals(Schema.FieldType.STRING.withNullable(false), 
schema.getField("url").getType());
+    assertEquals(Schema.FieldType.STRING.withNullable(false), 
schema.getField("apiKey").getType());
+    assertEquals(
+        Schema.FieldType.INT32.withNullable(true), 
schema.getField("batchCount").getType());
+    assertEquals(
+        Schema.FieldType.INT64.withNullable(true), 
schema.getField("maxBufferSize").getType());
+    assertEquals(
+        Schema.FieldType.INT32.withNullable(true), 
schema.getField("parallelism").getType());
+    assertEquals(
+        Schema.FieldType.row(errorHandlingSchema).withNullable(true),
+        schema.getField("errorHandling").getType());
+  }
+}
diff --git a/sdks/java/io/expansion-service/build.gradle 
b/sdks/java/io/expansion-service/build.gradle
index 32894b97809..60ef89ed223 100644
--- a/sdks/java/io/expansion-service/build.gradle
+++ b/sdks/java/io/expansion-service/build.gradle
@@ -96,6 +96,7 @@ dependencies {
   runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: 
"shadow")
   runtimeOnly library.java.bigdataoss_util_hadoop
 
+  runtimeOnly project(":sdks:java:io:datadog")
   runtimeOnly project(":sdks:java:io:mongodb")
   runtimeOnly library.java.kafka_clients
   runtimeOnly library.java.slf4j_jdk14
diff --git a/sdks/python/apache_beam/yaml/integration_tests.py 
b/sdks/python/apache_beam/yaml/integration_tests.py
index 150c0ca8625..e319a3d3a9b 100644
--- a/sdks/python/apache_beam/yaml/integration_tests.py
+++ b/sdks/python/apache_beam/yaml/integration_tests.py
@@ -21,18 +21,52 @@ import contextlib
 import copy
 import glob
 import itertools
+import json
 import logging
 import os
 import random
 import secrets
 import sqlite3
 import string
+import struct
 import unittest
 import uuid
 from datetime import datetime
 from datetime import timezone
 
 import mock
+
+from apache_beam.coders import Coder
+from apache_beam.coders.coder_impl import CoderImpl
+from apache_beam.yaml.test_utils.datadog_test_utils import 
temp_fake_datadog_server
+
+
+class BigEndianIntegerCoderImpl(CoderImpl):
+  """Coder implementation for big-endian integers used in cross-language tests.
+  
+  This is needed because Java's BigEndianIntegerCoder falls back to the generic
+  'beam:coders:javasdk:0.1' URN when used in cross-language pipelines, and
+  Python's FnApiRunner needs to know how to decode it.
+  """
+  def encode_to_stream(self, value, stream, nested):
+    stream.write(struct.pack('>i', value))
+
+  def decode_from_stream(self, stream, nested):
+    return struct.unpack('>i', stream.read(4))[0]
+
+
+class BigEndianIntegerCoder(Coder):
+  def get_impl(self):
+    return BigEndianIntegerCoderImpl()
+
+
+# Register the coder with the fallback URN used by the Java SDK for this coder.
+# This allows the Python FnApiRunner to handle data sharded by Java transforms
+# using BigEndianIntegerCoder in integration tests.
+Coder.register_urn(
+    'beam:coders:javasdk:0.1',
+    None, lambda payload, components, context: BigEndianIntegerCoder())
+
 import psycopg2
 import pytds
 import sqlalchemy
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml 
b/sdks/python/apache_beam/yaml/standard_io.yaml
index 1790dc0f5a9..520c466b600 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -431,6 +431,27 @@
       config:
         gradle_target: 'sdks:java:io:expansion-service:shadowJar'
 
+#Datadog
+- type: renaming
+  transforms:
+    'WriteToDatadog': 'WriteToDatadog'
+  config:
+    mappings:
+      'WriteToDatadog':
+        'url': 'url'
+        'api_key': 'api_key'
+        'min_batch_count': 'min_batch_count'
+        'batch_count': 'batch_count'
+        'max_buffer_size': 'max_buffer_size'
+        'parallelism': 'parallelism'
+        'error_handling': 'error_handling'
+    underlying_provider:
+      type: beamJar
+      transforms:
+        'WriteToDatadog': 
'beam:schematransform:org.apache.beam:datadog_write:v1'
+      config:
+        gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+
 #MongoDB
 - type: renaming
   transforms:
diff --git a/sdks/python/apache_beam/yaml/test_utils/__init__.py 
b/sdks/python/apache_beam/yaml/test_utils/__init__.py
new file mode 100644
index 00000000000..89aea21adcf
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/test_utils/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Helper utilities for YAML integration tests."""
diff --git a/sdks/python/apache_beam/yaml/test_utils/datadog_test_utils.py 
b/sdks/python/apache_beam/yaml/test_utils/datadog_test_utils.py
new file mode 100644
index 00000000000..4a87a346668
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/test_utils/datadog_test_utils.py
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Helper utilities for Datadog integration tests."""
+
+import contextlib
+import gzip
+import http.server
+import io
+import json
+import logging
+import threading
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class DatadogConnection:
+  def __init__(self, url, api_key):
+    self.url = url
+    self.api_key = api_key
+
+
+class MockDatadogHandler(http.server.BaseHTTPRequestHandler):
+  def do_POST(self):
+    if self.path == "/api/v2/logs":
+      is_chunked = self.headers.get('Transfer-Encoding',
+                                    '').lower() == 'chunked'
+      is_gzip = self.headers.get('Content-Encoding', '').lower() == 'gzip'
+      content_len = int(self.headers.get('Content-Length', 0))
+
+      try:
+        raw_data = b''
+        if is_chunked:
+          while True:
+            line = self.rfile.readline().strip()
+            if not line:
+              break
+            chunk_len = int(line, 16)
+            if chunk_len == 0:
+              self.rfile.readline()  # Clear trail
+              break
+            raw_data += self.rfile.read(chunk_len)
+            self.rfile.readline()  # Clear trail
+        elif content_len > 0:
+          raw_data = self.rfile.read(content_len)
+
+        if raw_data and is_gzip:
+          with gzip.GzipFile(fileobj=io.BytesIO(raw_data)) as f:
+            raw_data = f.read()
+
+        if raw_data:
+          data = json.loads(raw_data)
+          with self.server.record_lock:
+            if isinstance(data, list):
+              self.server.received_records.extend(data)
+            else:
+              self.server.received_records.append(data)
+      except Exception as e:
+        logging.error("CRITICAL: Failure unpacking mock datadog payload: %s", 
e)
+
+      self.send_response(200)
+      self.send_header('Content-Type', 'application/json')
+      self.end_headers()
+      self.wfile.write(b'{"status": "ok"}')
+    else:
+      self.send_response(404)
+      self.end_headers()
+
+  def log_message(self, format, *args):
+    pass
+
+
[email protected]
+def temp_datadog_mock_server(received_records):
+  server = http.server.ThreadingHTTPServer(('localhost', 0), 
MockDatadogHandler)
+  server.received_records = received_records
+  server.record_lock = threading.Lock()
+  ip, port = server.server_address
+  thread = threading.Thread(target=server.serve_forever)
+  thread.daemon = True
+  thread.start()
+  try:
+    yield f"http://{ip}:{port}";
+  finally:
+    server.shutdown()
+    server.server_close()
+    thread.join()
+
+
[email protected]
+def temp_fake_datadog_server(expected_records=None):
+  """Context manager to provide a temporary fake Datadog server for testing.
+  """
+  received = []
+  with temp_datadog_mock_server(received) as mock_url:
+    try:
+      yield DatadogConnection(
+          url=mock_url,
+          api_key="dummy_key_for_testing",
+      )
+    except Exception as err:
+      logging.error(
+          "Error interacting with temporary fake Datadog server: %s", err)
+      raise err
+    finally:
+      if expected_records is not None:
+
+        canonicalize = lambda rec: json.dumps(rec, sort_keys=True)
+
+        actual_strs = sorted([canonicalize(r) for r in received])
+        expected_strs = sorted([canonicalize(e) for e in expected_records])
+
+        assert actual_strs == expected_strs, (
+            f"Mismatch in recorded Datadog events!\n"
+            f"Expected: {expected_strs}\n"
+            f"Actual:   {actual_strs}"
+        )
diff --git a/sdks/python/apache_beam/yaml/tests/datadog.yaml 
b/sdks/python/apache_beam/yaml/tests/datadog.yaml
new file mode 100644
index 00000000000..24485bfbaf4
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/datadog.yaml
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+  - name: FAKE_DATADOG_SERVER
+    type: 
"apache_beam.yaml.test_utils.datadog_test_utils.temp_fake_datadog_server"
+    config:
+      expected_records:
+        - { ddsource: "apache-beam1", ddtags: "test-tags1", hostname: 
"test-host", service: "test-service", message: "Event for label 11a" }
+        - { ddsource: "apache-beam2", ddtags: "test-tags2", hostname: 
"test-host", service: "test-service", message: "Event for label 37a" }
+        - { ddsource: "apache-beam3", ddtags: "test-tags3", hostname: 
"test-host", service: "test-service", message: "Event for label 389a" }
+        - { ddsource: "apache-beam4", ddtags: "test-tags4", hostname: 
"test-host", service: "test-service", message: "Event for label 3821b" }
+
+pipelines:
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - { ddsource: "apache-beam1", ddtags: "test-tags1", hostname: 
"test-host", service: "test-service", message: "Event for label 11a" }
+              - { ddsource: "apache-beam2", ddtags: "test-tags2", hostname: 
"test-host", service: "test-service", message: "Event for label 37a" }
+              - { ddsource: "apache-beam3", ddtags: "test-tags3", hostname: 
"test-host", service: "test-service", message: "Event for label 389a" }
+              - { ddsource: "apache-beam4", ddtags: "test-tags4", hostname: 
"test-host", service: "test-service", message: "Event for label 3821b" }
+              - { ddsource: "apache-beam-broken", hostname: "test-host" } # 
Triggers mandatory field failure
+        - type: WriteToDatadog
+          input: Create
+          config:
+            url: "{FAKE_DATADOG_SERVER.url}"
+            api_key: "{FAKE_DATADOG_SERVER.api_key}"
+            min_batch_count: 1
+            batch_count: 2
+            max_buffer_size: 1000
+            parallelism: 1
+            error_handling:
+              output: error_output
+        - type: MapToFields
+          input: WriteToDatadog.error_output
+          config:
+            language: python
+            fields:
+              failed_source: "failed_row.ddsource"
+        - type: AssertEqual
+          input: MapToFields
+          config:
+            elements:
+              - { failed_source: "apache-beam-broken" }
+        # Asserting good records is taken care of by the fixture
+
diff --git a/sdks/standard_external_transforms.yaml 
b/sdks/standard_external_transforms.yaml
index 057c4e3f47d..b50402a64d5 100644
--- a/sdks/standard_external_transforms.yaml
+++ b/sdks/standard_external_transforms.yaml
@@ -21,6 +21,41 @@
 #
 # Last updated on: 2026-05-06
 
+- default_service: sdks:java:io:expansion-service:shadowJar
+  description: ''
+  destinations:
+    python: apache_beam/io
+  fields:
+  - description: The Datadog API key.
+    name: api_key
+    nullable: false
+    type: str
+  - description: The number of events to batch together for each write.
+    name: batch_count
+    nullable: true
+    type: int32
+  - description: Specifies how to handle errors.
+    name: error_handling
+    nullable: true
+    type: Row(output=<class 'str'>)
+  - description: The maximum buffer size in bytes.
+    name: max_buffer_size
+    nullable: true
+    type: int64
+  - description: The minimum number of events to batch together for each write.
+    name: min_batch_count
+    nullable: true
+    type: int32
+  - description: The degree of parallelism for writing.
+    name: parallelism
+    nullable: true
+    type: int32
+  - description: The Datadog API URL.
+    name: url
+    nullable: false
+    type: str
+  identifier: beam:schematransform:org.apache.beam:datadog_write:v1
+  name: DatadogWrite
 - default_service: sdks:java:io:expansion-service:shadowJar
   description: 'Outputs a PCollection of Beam Rows, each containing a single 
INT64
     number called "value". The count is produced from the given "start" value 
and

Reply via email to