This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 84b467b45a5 Add Datadog IO (#37319)
84b467b45a5 is described below

commit 84b467b45a532f9d75c372248ec8fc5951a7358d
Author: Derrick Williams <[email protected]>
AuthorDate: Wed Jan 21 11:56:59 2026 -0500

    Add Datadog IO (#37319)
    
    * porting and some improvements
    
    * fix some publisher issues
    
    * checkstyle, import, etc changes
    
    * add changes info on datadog
    
    * updated formatting and correct issue link
    
    * fix format issue
    
    * fix comments
    
    * add precommit for datadog
    
    * fix nullable annotation
---
 .github/workflows/README.md                        |   1 +
 .../beam_PreCommit_Java_Datadog_IO_Direct.yml      | 120 +++++
 CHANGES.md                                         |  10 +-
 build.gradle.kts                                   |   1 +
 sdks/java/io/datadog/build.gradle                  |  48 ++
 .../apache/beam/sdk/io/datadog/DatadogEvent.java   |  97 ++++
 .../beam/sdk/io/datadog/DatadogEventCoder.java     |  94 ++++
 .../beam/sdk/io/datadog/DatadogEventPublisher.java | 330 ++++++++++++
 .../sdk/io/datadog/DatadogEventSerializer.java     |  45 ++
 .../beam/sdk/io/datadog/DatadogEventWriter.java    | 521 +++++++++++++++++++
 .../org/apache/beam/sdk/io/datadog/DatadogIO.java  | 236 +++++++++
 .../beam/sdk/io/datadog/DatadogWriteError.java     |  77 +++
 .../sdk/io/datadog/DatadogWriteErrorCoder.java     |  88 ++++
 .../apache/beam/sdk/io/datadog/package-info.java   |  28 +
 .../beam/sdk/io/datadog/DatadogEventCoderTest.java |  65 +++
 .../sdk/io/datadog/DatadogEventPublisherTest.java  | 176 +++++++
 .../sdk/io/datadog/DatadogEventSerializerTest.java |  95 ++++
 .../beam/sdk/io/datadog/DatadogEventTest.java      |  73 +++
 .../sdk/io/datadog/DatadogEventWriterTest.java     | 566 +++++++++++++++++++++
 .../apache/beam/sdk/io/datadog/DatadogIOTest.java  | 172 +++++++
 .../sdk/io/datadog/DatadogWriteErrorCoderTest.java |  61 +++
 .../beam/sdk/io/datadog/DatadogWriteErrorTest.java |  66 +++
 settings.gradle.kts                                |   1 +
 23 files changed, 2966 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/README.md b/.github/workflows/README.md
index 283be9c2b1f..376e3d0af54 100644
--- a/.github/workflows/README.md
+++ b/.github/workflows/README.md
@@ -235,6 +235,7 @@ PreCommit Jobs run in a schedule and also get triggered in 
a PR if relevant sour
 | [ PreCommit Java Cdap IO Direct 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Cdap_IO_Direct.yml)
 | N/A |`Run Java_Cdap_IO_Direct PreCommit`| 
[![.github/workflows/beam_PreCommit_Java_Cdap_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Cdap_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Cdap_IO_Direct.yml?query=event%3Aschedule)
 |
 | [ PreCommit Java Clickhouse IO Direct 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Clickhouse_IO_Direct.yml)
 | N/A |`Run Java_Clickhouse_IO_Direct PreCommit`| 
[![.github/workflows/beam_PreCommit_Java_Clickhouse_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Clickhouse_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Clickhouse_IO_Direct.yml?query=event%3Aschedule)
 |
 | [ PreCommit Java Csv IO Direct 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Csv_IO_Direct.yml)
 | N/A |`Run Java_Csv_IO_Direct PreCommit`| 
[![.github/workflows/beam_PreCommit_Java_Csv_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Csv_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Csv_IO_Direct.yml?query=event%3Aschedule)
 |
+| [ PreCommit Java Datadog IO Direct 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml)
 | N/A |`Run Java_Datadog_IO_Direct PreCommit`| 
[![.github/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml?query=event%3Aschedule)
 |
 | [ PreCommit Java Debezium IO Direct 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Debezium_IO_Direct.yml)
 | N/A |`Run Java_Debezium_IO_Direct PreCommit`| 
[![.github/workflows/beam_PreCommit_Java_Debezium_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Debezium_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Debezium_IO_Direct.yml?query=event%3Aschedule)
 |
 | [ PreCommit Java ElasticSearch IO Direct 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_ElasticSearch_IO_Direct.yml)
 | N/A |`Run Java_ElasticSearch_IO_Direct PreCommit`| 
[![.github/workflows/beam_PreCommit_Java_ElasticSearch_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_ElasticSearch_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_ElasticSearch_IO_Direct.yml?query
 [...]
 | [ PreCommit Java Examples Dataflow 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Examples_Dataflow.yml)
 | N/A |`Run Java_Examples_Dataflow PreCommit`| 
[![.github/workflows/beam_PreCommit_Java_Examples_Dataflow.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Examples_Dataflow.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Examples_Dataflow.yml?query=event%3Aschedule)
 |
diff --git a/.github/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml 
b/.github/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml
new file mode 100644
index 00000000000..08bebf31f6b
--- /dev/null
+++ b/.github/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml
@@ -0,0 +1,120 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit Java Datadog IO Direct
+
+on:
+  push:
+    tags: ['v*']
+    branches: ['master', 'release-*']
+    paths:
+      - "sdks/java/io/datadog/**"
+      - ".github/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml"
+  pull_request_target:
+    branches: ['master', 'release-*']
+    paths:
+      - "sdks/java/io/datadog/**"
+      - 'release/trigger_all_tests.json'
+      - '.github/trigger_files/beam_PreCommit_Java_Datadog_IO_Direct.json'
+  issue_comment:
+    types: [created]
+  schedule:
+    - cron: '15 1/6 * * *'
+  workflow_dispatch:
+
+#Setting explicit permissions for the action to avoid the default permissions 
which are `write-all` in case of pull_request_target event
+permissions:
+  actions: write
+  pull-requests: write
+  checks: write
+  contents: read
+  deployments: read
+  id-token: none
+  issues: write
+  discussions: read
+  packages: read
+  pages: read
+  repository-projects: read
+  security-events: read
+  statuses: read
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+  group: '${{ github.workflow }} @ ${{ github.event.issue.number || 
github.event.pull_request.head.label || github.sha || github.head_ref || 
github.ref }}-${{ github.event.schedule || github.event.comment.id || 
github.event.sender.login }}'
+  cancel-in-progress: true
+
+env:
+  DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
+  GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+  GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+  beam_PreCommit_Java_Datadog_IO_Direct:
+    name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+    strategy:
+      matrix:
+        job_name: ["beam_PreCommit_Java_Datadog_IO_Direct"]
+        job_phrase: ["Run Java_Datadog_IO_Direct PreCommit"]
+    timeout-minutes: 60
+    if: |
+      github.event_name == 'push' ||
+      github.event_name == 'pull_request_target' ||
+      (github.event_name == 'schedule' && github.repository == 'apache/beam') 
||
+      github.event_name == 'workflow_dispatch' ||
+      github.event.comment.body == 'Run Java_Datadog_IO_Direct PreCommit'
+    runs-on: [self-hosted, ubuntu-20.04, main]
+    steps:
+      - uses: actions/checkout@v4
+      - name: Setup repository
+        uses: ./.github/actions/setup-action
+        with:
+          comment_phrase: ${{ matrix.job_phrase }}
+          github_token: ${{ secrets.GITHUB_TOKEN }}
+          github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+      - name: Setup environment
+        uses: ./.github/actions/setup-environment-action
+      - name: run Datadog IO build script
+        uses: ./.github/actions/gradle-command-self-hosted-action
+        with:
+          gradle-command: :sdks:java:io:datadog:build
+          arguments: |
+            -PdisableSpotlessCheck=true \
+            -PdisableCheckStyle=true \
+      - name: Archive JUnit Test Results
+        uses: actions/upload-artifact@v4
+        if: ${{ !success() }}
+        with:
+          name: JUnit Test Results
+          path: "**/build/reports/tests/"
+      - name: Publish JUnit Test Results
+        uses: EnricoMi/publish-unit-test-result-action@v2
+        if: always()
+        with:
+          commit: '${{ env.prsha || env.GITHUB_SHA }}'
+          comment_mode: ${{ github.event_name == 'issue_comment'  && 'always' 
|| 'off' }}
+          files: '**/build/test-results/**/*.xml'
+          large_files: true
+      - name: Archive SpotBugs Results
+        uses: actions/upload-artifact@v4
+        if: always()
+        with:
+          name: SpotBugs Results
+          path: '**/build/reports/spotbugs/*.html'
+      - name: Publish SpotBugs Results
+        uses: jwgmeligmeyling/[email protected]
+        if: always()
+        with:
+          name: Publish SpotBugs
+          path: '**/build/reports/spotbugs/*.html'
\ No newline at end of file
diff --git a/CHANGES.md b/CHANGES.md
index 6bc2f938cf3..0b152787b8b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -55,7 +55,6 @@
 * ([#X](https://github.com/apache/beam/issues/X)).
 -->
 
-
 # [2.72.0] - Unreleased
 
 ## Highlights
@@ -65,7 +64,7 @@
 
 ## I/Os
 
-* Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* Add support for Datadog IO (Java) 
([#37318](https://github.com/apache/beam/issues/37318)).
 
 ## New Features / Improvements
 
@@ -113,6 +112,7 @@
 
 ## Known Issues
 
+
 # [2.70.0] - 2025-12-16
 
 ## Highlights
@@ -196,7 +196,7 @@ Now Beam has full support for Milvus integration including 
Milvus enrichment and
 
 ## Highlights
 
-* [Python] Prism runner now enabled by default for most Python pipelines using 
the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This 
may break some tests, see https://github.com/apache/beam/pull/34612 for details 
on how to handle issues.
+* (Python) Prism runner now enabled by default for most Python pipelines using 
the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This 
may break some tests, see https://github.com/apache/beam/pull/34612 for details 
on how to handle issues.
 
 ## I/Os
 
@@ -212,7 +212,7 @@ Now Beam has full support for Milvus integration including 
Milvus enrichment and
   Beam now supports data enrichment capabilities using SQL databases, with 
built-in support for:
   - Managed PostgreSQL, MySQL, and Microsoft SQL Server instances on CloudSQL
   - Unmanaged SQL database instances not hosted on CloudSQL (e.g., self-hosted 
or on-premises databases)
-* [Python] Added the `ReactiveThrottler` and `ThrottlingSignaler` classes to 
streamline throttling behavior in DoFns, expose throttling mechanisms for users 
([#35984](https://github.com/apache/beam/pull/35984))
+* (Python) Added the `ReactiveThrottler` and `ThrottlingSignaler` classes to 
streamline throttling behavior in DoFns, expose throttling mechanisms for users 
([#35984](https://github.com/apache/beam/pull/35984))
 * Added a pipeline option to specify the processing timeout for a single 
element by any PTransform (Java/Python/Go) 
([#35174](https://github.com/apache/beam/issues/35174)).
   - When specified, the SDK harness automatically restarts if an element takes 
too long to process. Beam runner may then retry processing of the same work 
item.
   - Use the `--element_processing_timeout_minutes` option to reduce the chance 
of having stalled pipelines due to unexpected cases of slow processing, where 
slowness might not happen again if processing of the same element is retried.
@@ -2351,4 +2351,4 @@ Schema Options, it will be removed in version `2.23.0`. 
([BEAM-9704](https://iss
 
 ## Highlights
 
-- For versions 2.19.0 and older release notes are available on [Apache Beam 
Blog](https://beam.apache.org/blog/).
+- For versions 2.19.0 and older release notes are available on [Apache Beam 
Blog](https://beam.apache.org/blog/).
\ No newline at end of file
diff --git a/build.gradle.kts b/build.gradle.kts
index 342fe1ee32f..3ae49afa390 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -344,6 +344,7 @@ tasks.register("javaioPreCommit") {
   dependsOn(":sdks:java:io:csv:build")
   dependsOn(":sdks:java:io:cdap:build")
   dependsOn(":sdks:java:io:clickhouse:build")
+  dependsOn(":sdks:java:io:datadog:build")
   dependsOn(":sdks:java:io:debezium:expansion-service:build")
   dependsOn(":sdks:java:io:debezium:build")
   dependsOn(":sdks:java:io:elasticsearch:build")
diff --git a/sdks/java/io/datadog/build.gradle 
b/sdks/java/io/datadog/build.gradle
new file mode 100644
index 00000000000..785d656cead
--- /dev/null
+++ b/sdks/java/io/datadog/build.gradle
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+        automaticModuleName: 'org.apache.beam.sdk.io.datadog'
+)
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Datadog"
+ext.summary = "IO to read and write to Datadog."
+
+dependencies {
+    implementation 
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation library.java.vendored_guava_32_1_2_jre
+    implementation library.java.joda_time
+    implementation library.java.slf4j_api
+    implementation library.java.google_http_client
+    implementation library.java.google_code_gson
+    implementation library.java.auto_value_annotations
+    testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
+    testImplementation library.java.jupiter_api
+    testRuntimeOnly library.java.jupiter_engine
+    testImplementation library.java.jupiter_params
+    testImplementation library.java.truth
+    testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
+    testImplementation project(path: ":sdks:java:io:common")
+    testImplementation group: 'org.mock-server', name: 
'mockserver-client-java', version: '5.10.0'
+    testImplementation group: 'org.mock-server', name: 
'mockserver-junit-rule', version: '5.10.0'
+    implementation library.java.google_http_client_apache_v2
+    implementation library.java.http_client
+    implementation library.java.http_core
+}
diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java
new file mode 100644
index 00000000000..80334b5e466
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** A class for Datadog events. */
+@AutoValue
+public abstract class DatadogEvent {
+
+  public static Builder newBuilder() {
+    return new AutoValue_DatadogEvent.Builder();
+  }
+
+  public abstract @Nullable String ddsource();
+
+  public abstract @Nullable String ddtags();
+
+  public abstract @Nullable String hostname();
+
+  public abstract @Nullable String service();
+
+  public abstract @Nullable String message();
+
+  /** A builder class for creating {@link DatadogEvent} objects. */
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    abstract Builder setDdsource(String source);
+
+    abstract Builder setDdtags(String tags);
+
+    abstract Builder setHostname(String hostname);
+
+    abstract Builder setService(String service);
+
+    abstract Builder setMessage(String message);
+
+    abstract String message();
+
+    abstract DatadogEvent autoBuild();
+
+    public Builder withSource(String source) {
+      checkNotNull(source, "withSource(source) called with null input.");
+
+      return setDdsource(source);
+    }
+
+    public Builder withTags(String tags) {
+      checkNotNull(tags, "withTags(tags) called with null input.");
+
+      return setDdtags(tags);
+    }
+
+    public Builder withHostname(String hostname) {
+      checkNotNull(hostname, "withHostname(hostname) called with null input.");
+
+      return setHostname(hostname);
+    }
+
+    public Builder withService(String service) {
+      checkNotNull(service, "withService(service) called with null input.");
+
+      return setService(service);
+    }
+
+    public Builder withMessage(String message) {
+      checkNotNull(message, "withMessage(message) called with null input.");
+
+      return setMessage(message);
+    }
+
+    public DatadogEvent build() {
+      checkNotNull(message(), "Message is required.");
+
+      return autoBuild();
+    }
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventCoder.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventCoder.java
new file mode 100644
index 00000000000..4e5de996ef5
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventCoder.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A {@link org.apache.beam.sdk.coders.Coder} for {@link DatadogEvent} 
objects. */
+public class DatadogEventCoder extends AtomicCoder<DatadogEvent> {
+
+  private static final DatadogEventCoder DATADOG_EVENT_CODER = new 
DatadogEventCoder();
+
+  private static final TypeDescriptor<DatadogEvent> TYPE_DESCRIPTOR =
+      new TypeDescriptor<DatadogEvent>() {};
+  private static final StringUtf8Coder STRING_UTF_8_CODER = 
StringUtf8Coder.of();
+  private static final NullableCoder<String> STRING_NULLABLE_CODER =
+      NullableCoder.of(STRING_UTF_8_CODER);
+
+  public static DatadogEventCoder of() {
+    return DATADOG_EVENT_CODER;
+  }
+
+  @Override
+  public void encode(DatadogEvent value, OutputStream out) throws IOException {
+    STRING_NULLABLE_CODER.encode(value.ddsource(), out);
+    STRING_NULLABLE_CODER.encode(value.ddtags(), out);
+    STRING_NULLABLE_CODER.encode(value.hostname(), out);
+    STRING_NULLABLE_CODER.encode(value.service(), out);
+    STRING_NULLABLE_CODER.encode(value.message(), out);
+  }
+
+  @Override
+  public DatadogEvent decode(InputStream in) throws IOException {
+    DatadogEvent.Builder builder = DatadogEvent.newBuilder();
+
+    String source = STRING_NULLABLE_CODER.decode(in);
+    if (source != null) {
+      builder.withSource(source);
+    }
+
+    String tags = STRING_NULLABLE_CODER.decode(in);
+    if (tags != null) {
+      builder.withTags(tags);
+    }
+
+    String hostname = STRING_NULLABLE_CODER.decode(in);
+    if (hostname != null) {
+      builder.withHostname(hostname);
+    }
+
+    String service = STRING_NULLABLE_CODER.decode(in);
+    if (service != null) {
+      builder.withService(service);
+    }
+
+    String message = STRING_NULLABLE_CODER.decode(in);
+    if (message != null) {
+      builder.withMessage(message);
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  public TypeDescriptor<DatadogEvent> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    throw new NonDeterministicException(
+        this, "DatadogEvent can hold arbitrary instances, which may be 
non-deterministic.");
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisher.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisher.java
new file mode 100644
index 00000000000..00a106b2ded
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisher.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.http.ByteArrayContent;
+import com.google.api.client.http.GZipEncoding;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpContent;
+import com.google.api.client.http.HttpMediaType;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
+import com.google.api.client.http.apache.v2.ApacheHttpTransport;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.Set;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.apache.http.client.config.CookieSpecs;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DatadogEventPublisher} is a utility class that helps write {@link 
DatadogEvent}s to a
+ * Datadog Logs API endpoint.
+ */
+@AutoValue
+public abstract class DatadogEventPublisher {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DatadogEventPublisher.class);
+
+  private static final int DEFAULT_MAX_CONNECTIONS = 1;
+
+  @VisibleForTesting protected static final String DD_URL_PATH = "api/v2/logs";
+
+  private static final String DD_API_KEY_HEADER = "dd-api-key";
+
+  private static final String DD_ORIGIN_HEADER = "dd-evp-origin";
+  private static final String DD_ORIGIN_DATAFLOW = "dataflow";
+
+  private static final HttpMediaType MEDIA_TYPE =
+      new HttpMediaType("application/json;charset=utf-8");
+
+  private static final String CONTENT_TYPE =
+      Joiner.on('/').join(MEDIA_TYPE.getType(), MEDIA_TYPE.getSubType());
+
+  private static final String HTTPS_PROTOCOL_PREFIX = "https";
+
+  public static Builder newBuilder() {
+    return new AutoValue_DatadogEventPublisher.Builder()
+        
.withMaxElapsedMillis(ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS);
+  }
+
+  abstract ApacheHttpTransport transport();
+
+  abstract HttpRequestFactory requestFactory();
+
+  abstract GenericUrl genericUrl();
+
+  abstract String apiKey();
+
+  abstract Integer maxElapsedMillis();
+
+  /**
+   * Executes a POST for the list of {@link DatadogEvent} objects into 
Datadog's Logs API.
+   *
+   * @param events List of {@link DatadogEvent}s
+   * @return {@link HttpResponse} for the POST.
+   */
+  public HttpResponse execute(List<DatadogEvent> events) throws IOException {
+
+    HttpContent content = getContent(events);
+    HttpRequest request = requestFactory().buildPostRequest(genericUrl(), 
content);
+
+    request.setEncoding(new GZipEncoding());
+    request.setUnsuccessfulResponseHandler(
+        new HttpSendLogsUnsuccessfulResponseHandler(getConfiguredBackOff()));
+    request.setIOExceptionHandler(new 
HttpBackOffIOExceptionHandler(getConfiguredBackOff()));
+
+    setHeaders(request, apiKey());
+
+    return request.execute();
+  }
+
+  /**
+   * Same as {@link DatadogEventPublisher#execute(List)} but with a single 
{@link DatadogEvent}.
+   *
+   * @param event {@link DatadogEvent} object.
+   */
+  public HttpResponse execute(DatadogEvent event) throws IOException {
+    return this.execute(ImmutableList.of(event));
+  }
+
+  /**
+   * Return an {@link ExponentialBackOff} with the right settings.
+   *
+   * @return {@link ExponentialBackOff} object.
+   */
+  @VisibleForTesting
+  protected ExponentialBackOff getConfiguredBackOff() {
+    return new 
ExponentialBackOff.Builder().setMaxElapsedTimeMillis(maxElapsedMillis()).build();
+  }
+
+  /** Shutdown connection manager and releases all resources. */
+  public void close() throws IOException {
+    if (transport() != null) {
+      LOG.info("Closing publisher transport.");
+      transport().shutdown();
+    }
+  }
+
+  /**
+   * Utility method to set http headers into the {@link HttpRequest}.
+   *
+   * @param request {@link HttpRequest} object to add headers to.
+   * @param apiKey Datadog's Logs API key.
+   */
+  private void setHeaders(HttpRequest request, String apiKey) {
+    request.getHeaders().set(DD_API_KEY_HEADER, apiKey);
+    request.getHeaders().set(DD_ORIGIN_HEADER, DD_ORIGIN_DATAFLOW);
+    request.getHeaders().setContentEncoding("gzip");
+  }
+
+  /**
+   * Utility method to marshall a list of {@link DatadogEvent}s into an {@link 
HttpContent} object
+   * that can be used to create an {@link HttpRequest}.
+   *
+   * @param events List of {@link DatadogEvent}s
+   * @return {@link HttpContent} that can be used to create an {@link 
HttpRequest}.
+   */
+  @VisibleForTesting
+  protected HttpContent getContent(List<DatadogEvent> events) {
+    String payload = DatadogEventSerializer.getPayloadString(events);
+    LOG.debug("Payload content: {}", payload);
+    return ByteArrayContent.fromString(CONTENT_TYPE, payload);
+  }
+
+  static class HttpSendLogsUnsuccessfulResponseHandler implements 
HttpUnsuccessfulResponseHandler {
+    /*
+      See: https://docs.datadoghq.com/api/latest/logs/#send-logs
+      408: Request Timeout, request should be retried after some time
+      429: Too Many Requests, request should be retried after some time
+    */
+    private static final Set<Integer> RETRYABLE_4XX_CODES = 
ImmutableSet.of(408, 429);
+
+    private final Sleeper sleeper = Sleeper.DEFAULT;
+    private final BackOff backOff;
+
+    HttpSendLogsUnsuccessfulResponseHandler(BackOff backOff) {
+      this.backOff = Preconditions.checkNotNull(backOff);
+    }
+
+    @Override
+    public boolean handleResponse(HttpRequest req, HttpResponse res, boolean 
supportsRetry)
+        throws IOException {
+      if (!supportsRetry) {
+        return false;
+      }
+
+      boolean is5xxStatusCode = res.getStatusCode() / 100 == 5;
+      boolean isRetryable4xxStatusCode = 
RETRYABLE_4XX_CODES.contains(res.getStatusCode());
+      if (is5xxStatusCode || isRetryable4xxStatusCode) {
+        try {
+          return BackOffUtils.next(sleeper, backOff);
+        } catch (InterruptedException exception) {
+          // Mark thread as interrupted since we cannot throw 
InterruptedException here.
+          Thread.currentThread().interrupt();
+        }
+      }
+      return false;
+    }
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+
+    abstract Builder setTransport(ApacheHttpTransport transport);
+
+    abstract ApacheHttpTransport transport();
+
+    abstract Builder setRequestFactory(HttpRequestFactory requestFactory);
+
+    abstract HttpRequestFactory requestFactory();
+
+    abstract Builder setGenericUrl(GenericUrl genericUrl);
+
+    abstract GenericUrl genericUrl();
+
+    abstract Builder setApiKey(String apiKey);
+
+    abstract String apiKey();
+
+    abstract Builder setMaxElapsedMillis(Integer maxElapsedMillis);
+
+    abstract Integer maxElapsedMillis();
+
+    abstract DatadogEventPublisher autoBuild();
+
+    /**
+     * Method to set the Datadog Logs API URL.
+     *
+     * @param url Logs API URL
+     * @return {@link Builder}
+     */
+    public Builder withUrl(String url) throws UnsupportedEncodingException {
+      checkNotNull(url, "withUrl(url) called with null input.");
+      return setGenericUrl(getGenericUrl(url));
+    }
+
+    /**
+     * Method to set the Datadog Logs API key.
+     *
+     * @param apiKey Logs API key.
+     * @return {@link Builder}
+     */
+    public Builder withApiKey(String apiKey) {
+      checkNotNull(apiKey, "withApiKey(apiKey) called with null input.");
+      return setApiKey(apiKey);
+    }
+
+    /**
+     * Method to max timeout for {@link ExponentialBackOff}. Otherwise uses 
the default setting for
+     * {@link ExponentialBackOff}.
+     *
+     * @param maxElapsedMillis max elapsed time in milliseconds for timeout.
+     * @return {@link Builder}
+     */
+    public Builder withMaxElapsedMillis(Integer maxElapsedMillis) {
+      checkNotNull(
+          maxElapsedMillis, "withMaxElapsedMillis(maxElapsedMillis) called 
with null input.");
+      return setMaxElapsedMillis(maxElapsedMillis);
+    }
+
+    /**
+     * Validates and builds a {@link DatadogEventPublisher} object.
+     *
+     * @return {@link DatadogEventPublisher}
+     */
+    public DatadogEventPublisher build() throws NoSuchAlgorithmException, 
KeyManagementException {
+
+      checkNotNull(apiKey(), "API Key needs to be specified via 
withApiKey(apiKey).");
+      checkNotNull(genericUrl(), "URL needs to be specified via 
withUrl(url).");
+
+      CloseableHttpClient httpClient = getHttpClient(DEFAULT_MAX_CONNECTIONS);
+
+      setTransport(new ApacheHttpTransport(httpClient));
+      setRequestFactory(transport().createRequestFactory());
+
+      return autoBuild();
+    }
+
+    /**
+     * Utility method to convert a baseUrl into a {@link GenericUrl}.
+     *
+     * @param baseUrl url pointing to the Logs API endpoint.
+     * @return {@link GenericUrl}
+     */
+    private GenericUrl getGenericUrl(String baseUrl) {
+      String url = Joiner.on('/').join(baseUrl, DD_URL_PATH);
+
+      return new GenericUrl(url);
+    }
+
+    /**
+     * Utility method to create a {@link CloseableHttpClient} to make http 
POSTs against Datadog's
+     * Logs API.
+     */
+    private CloseableHttpClient getHttpClient(int maxConnections)
+        throws NoSuchAlgorithmException, KeyManagementException {
+
+      HttpClientBuilder builder = 
ApacheHttpTransport.newDefaultHttpClientBuilder();
+
+      if (genericUrl().getScheme().equalsIgnoreCase(HTTPS_PROTOCOL_PREFIX)) {
+        LOG.info("SSL connection requested");
+
+        HostnameVerifier hostnameVerifier = new DefaultHostnameVerifier();
+
+        SSLContext sslContext = SSLContextBuilder.create().build();
+
+        SSLConnectionSocketFactory connectionSocketFactory =
+            new SSLConnectionSocketFactory(sslContext, hostnameVerifier);
+        builder.setSSLSocketFactory(connectionSocketFactory);
+      }
+
+      builder.setMaxConnTotal(maxConnections);
+      builder.setDefaultRequestConfig(
+          RequestConfig.custom().setCookieSpec(CookieSpecs.STANDARD).build());
+
+      return builder.build();
+    }
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializer.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializer.java
new file mode 100644
index 00000000000..1a388682729
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class DatadogEventSerializer {
+  private static final Gson GSON =
+      new GsonBuilder().setFieldNamingStrategy(f -> 
f.getName().toLowerCase()).create();
+
+  private DatadogEventSerializer() {}
+
+  /** Utility method to get payload string from a list of {@link 
DatadogEvent}s. */
+  public static String getPayloadString(List<DatadogEvent> events) {
+    return GSON.toJson(events);
+  }
+
+  /** Utility method to get payload string from a {@link DatadogEvent}. */
+  public static String getPayloadString(DatadogEvent event) {
+    return GSON.toJson(event);
+  }
+
+  /** Utility method to get payload size from a string. */
+  public static long getPayloadSize(String payload) {
+    return payload.getBytes(StandardCharsets.UTF_8).length;
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventWriter.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventWriter.java
new file mode 100644
index 00000000000..6de3a1b86e2
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventWriter.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseException;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InetAddresses;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InternetDomainName;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link DoFn} to write {@link DatadogEvent}s to Datadog's Logs API. */
+@AutoValue
+public abstract class DatadogEventWriter
+    extends DoFn<KV<Integer, DatadogEvent>, DatadogWriteError> {
+
+  private static final Integer MIN_BATCH_COUNT = 10;
+  private static final Integer DEFAULT_BATCH_COUNT = 100;
+  private static final Integer MAX_BATCH_COUNT = 1000;
+  private static final Logger LOG = 
LoggerFactory.getLogger(DatadogEventWriter.class);
+  private static final long DEFAULT_FLUSH_DELAY = 2;
+  private static final Long MAX_BUFFER_SIZE = 5L * 1000 * 1000; // 5MB
+  private static final Counter INPUT_COUNTER =
+      Metrics.counter(DatadogEventWriter.class, "inbound-events");
+  private static final Counter SUCCESS_WRITES =
+      Metrics.counter(DatadogEventWriter.class, "outbound-successful-events");
+  private static final Counter FAILED_WRITES =
+      Metrics.counter(DatadogEventWriter.class, "outbound-failed-events");
+  private static final Counter INVALID_REQUESTS =
+      Metrics.counter(DatadogEventWriter.class, "http-invalid-requests");
+  private static final Counter SERVER_ERROR_REQUESTS =
+      Metrics.counter(DatadogEventWriter.class, "http-server-error-requests");
+  private static final Counter VALID_REQUESTS =
+      Metrics.counter(DatadogEventWriter.class, "http-valid-requests");
+  private static final Distribution SUCCESSFUL_WRITE_LATENCY_MS =
+      Metrics.distribution(DatadogEventWriter.class, 
"successful_write_to_datadog_latency_ms");
+  private static final Distribution UNSUCCESSFUL_WRITE_LATENCY_MS =
+      Metrics.distribution(DatadogEventWriter.class, 
"unsuccessful_write_to_datadog_latency_ms");
+  private static final Distribution SUCCESSFUL_WRITE_BATCH_SIZE =
+      Metrics.distribution(DatadogEventWriter.class, "write_to_datadog_batch");
+  private static final Distribution SUCCESSFUL_WRITE_PAYLOAD_SIZE =
+      Metrics.distribution(DatadogEventWriter.class, "write_to_datadog_bytes");
+  private static final String BUFFER_STATE_NAME = "buffer";
+  private static final String COUNT_STATE_NAME = "count";
+  private static final String BUFFER_SIZE_STATE_NAME = "buffer_size";
+  private static final String TIME_ID_NAME = "expiry";
+  private static final Pattern URL_PATTERN = 
Pattern.compile("^http(s?)://([^:]+)(:[0-9]+)?$");
+
+  @VisibleForTesting
+  protected static final String INVALID_URL_FORMAT_MESSAGE =
+      "Invalid url format. Url format should match PROTOCOL://HOST[:PORT], 
where PORT is optional. "
+          + "Supported Protocols are http and https. eg: http://hostname:8088";;
+
+  @StateId(BUFFER_STATE_NAME)
+  private final StateSpec<BagState<DatadogEvent>> buffer = StateSpecs.bag();
+
+  @StateId(COUNT_STATE_NAME)
+  private final StateSpec<ValueState<Long>> count = StateSpecs.value();
+
+  @StateId(BUFFER_SIZE_STATE_NAME)
+  private final StateSpec<ValueState<Long>> bufferSize = StateSpecs.value();
+
+  @TimerId(TIME_ID_NAME)
+  private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  private Integer batchCount;
+  private Long maxBufferSize;
+  @Nullable private transient DatadogEventPublisher publisher;
+
+  DatadogEventWriter() {
+    this.batchCount = DEFAULT_BATCH_COUNT;
+    this.maxBufferSize = MAX_BUFFER_SIZE;
+    this.publisher = null;
+  }
+
+  public static Builder newBuilder() {
+    return newBuilder(MIN_BATCH_COUNT);
+  }
+
+  public static Builder newBuilder(@Nullable Integer minBatchCount) {
+    return new AutoValue_DatadogEventWriter.Builder()
+        .setMinBatchCount(MoreObjects.firstNonNull(minBatchCount, 
MIN_BATCH_COUNT));
+  }
+
+  @Nullable
+  abstract String url();
+
+  @Nullable
+  abstract String apiKey();
+
+  @Nullable
+  abstract Integer minBatchCount();
+
+  @Nullable
+  abstract Integer inputBatchCount();
+
+  @Nullable
+  abstract Long maxBufferSize();
+
+  @Setup
+  public void setup() {
+
+    final String url = url();
+    if (url == null) {
+      throw new IllegalArgumentException("url is required for writing 
events.");
+    }
+    checkArgument(isValidUrlFormat(url), INVALID_URL_FORMAT_MESSAGE);
+    final String apiKey = apiKey();
+    if (apiKey == null) {
+      throw new IllegalArgumentException("API Key is required for writing 
events.");
+    }
+
+    batchCount = MoreObjects.firstNonNull(inputBatchCount(), 
DEFAULT_BATCH_COUNT);
+    LOG.info("Batch count set to: {}", batchCount);
+
+    maxBufferSize = MoreObjects.firstNonNull(maxBufferSize(), MAX_BUFFER_SIZE);
+    LOG.info("Max buffer size set to: {}", maxBufferSize);
+
+    checkArgument(
+        batchCount >= MoreObjects.firstNonNull(minBatchCount(), 
MIN_BATCH_COUNT),
+        "batchCount must be greater than or equal to %s",
+        minBatchCount());
+    checkArgument(
+        batchCount <= MAX_BATCH_COUNT,
+        "batchCount must be less than or equal to %s",
+        MAX_BATCH_COUNT);
+
+    try {
+      DatadogEventPublisher.Builder builder =
+          DatadogEventPublisher.newBuilder().withUrl(url).withApiKey(apiKey);
+
+      publisher = builder.build();
+    } catch (IOException | NoSuchAlgorithmException | KeyManagementException 
e) {
+      LOG.error("Error creating HttpEventPublisher: ", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @ProcessElement
+  public void processElement(
+      @Element KV<Integer, DatadogEvent> input,
+      OutputReceiver<DatadogWriteError> receiver,
+      BoundedWindow window,
+      @StateId(BUFFER_STATE_NAME) BagState<DatadogEvent> bufferState,
+      @StateId(COUNT_STATE_NAME) ValueState<Long> countState,
+      @StateId(BUFFER_SIZE_STATE_NAME) ValueState<Long> bufferSizeState,
+      @TimerId(TIME_ID_NAME) Timer timer)
+      throws IOException {
+
+    DatadogEvent event = input.getValue();
+    INPUT_COUNTER.inc();
+
+    String eventPayload = DatadogEventSerializer.getPayloadString(event);
+    long eventPayloadSize = 
DatadogEventSerializer.getPayloadSize(eventPayload);
+    if (eventPayloadSize > maxBufferSize) {
+      LOG.error(
+          "Error processing event of size {} due to exceeding max buffer 
size", eventPayloadSize);
+      DatadogWriteError error = 
DatadogWriteError.newBuilder().withPayload(eventPayload).build();
+      receiver.output(error);
+      return;
+    }
+
+    timer.offset(Duration.standardSeconds(DEFAULT_FLUSH_DELAY)).setRelative();
+
+    long count = MoreObjects.<Long>firstNonNull(countState.read(), 0L);
+    long bufferSize = MoreObjects.<Long>firstNonNull(bufferSizeState.read(), 
0L);
+    if (bufferSize + eventPayloadSize > maxBufferSize) {
+      LOG.debug("Flushing batch of {} events of size {} due to max buffer 
size", count, bufferSize);
+      flush(receiver, bufferState, countState, bufferSizeState);
+
+      count = 0L;
+      bufferSize = 0L;
+    }
+
+    bufferState.add(event);
+
+    count = count + 1L;
+    countState.write(count);
+
+    bufferSize = bufferSize + eventPayloadSize;
+    bufferSizeState.write(bufferSize);
+
+    if (count >= batchCount) {
+      LOG.debug("Flushing batch of {} events of size {} due to batch count", 
count, bufferSize);
+      flush(receiver, bufferState, countState, bufferSizeState);
+    }
+  }
+
+  @OnTimer(TIME_ID_NAME)
+  public void onExpiry(
+      OutputReceiver<DatadogWriteError> receiver,
+      @StateId(BUFFER_STATE_NAME) BagState<DatadogEvent> bufferState,
+      @StateId(COUNT_STATE_NAME) ValueState<Long> countState,
+      @StateId(BUFFER_SIZE_STATE_NAME) ValueState<Long> bufferSizeState)
+      throws IOException {
+
+    long count = MoreObjects.<Long>firstNonNull(countState.read(), 0L);
+    long bufferSize = MoreObjects.<Long>firstNonNull(bufferSizeState.read(), 
0L);
+
+    if (count > 0) {
+      LOG.debug("Flushing batch of {} events of size {} due to timer", count, 
bufferSize);
+      flush(receiver, bufferState, countState, bufferSizeState);
+    }
+  }
+
+  @Teardown
+  public void tearDown() {
+    if (this.publisher != null) {
+      try {
+        this.publisher.close();
+        LOG.info("Successfully closed HttpEventPublisher");
+
+      } catch (IOException e) {
+        LOG.warn("Received exception while closing HttpEventPublisher: ", e);
+      }
+    }
+  }
+
+  /**
+   * Utility method to flush a batch of events via {@link 
DatadogEventPublisher}.
+   *
+   * @param receiver Receiver to write {@link DatadogWriteError}s to
+   */
+  private void flush(
+      OutputReceiver<DatadogWriteError> receiver,
+      @StateId(BUFFER_STATE_NAME) BagState<DatadogEvent> bufferState,
+      @StateId(COUNT_STATE_NAME) ValueState<Long> countState,
+      @StateId(BUFFER_SIZE_STATE_NAME) ValueState<Long> bufferSizeState)
+      throws IOException {
+
+    if (!bufferState.isEmpty().read()) {
+
+      long count = MoreObjects.firstNonNull(countState.read(), 0L);
+      long bufferSize = MoreObjects.firstNonNull(bufferSizeState.read(), 0L);
+      HttpResponse response = null;
+      List<DatadogEvent> events = Lists.newArrayList(bufferState.read());
+      long startTime = System.nanoTime();
+      try {
+        // Important to close this response to avoid connection leak.
+        response = checkNotNull(publisher).execute(events);
+        if (!response.isSuccessStatusCode()) {
+          UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() 
- startTime));
+          FAILED_WRITES.inc(count);
+          int statusCode = response.getStatusCode();
+          if (statusCode >= 400 && statusCode < 500) {
+            INVALID_REQUESTS.inc();
+          } else if (statusCode >= 500 && statusCode < 600) {
+            SERVER_ERROR_REQUESTS.inc();
+          }
+
+          logWriteFailures(
+              count,
+              response.getStatusCode(),
+              response.parseAsString(),
+              response.getStatusMessage());
+          flushWriteFailures(
+              events, response.getStatusMessage(), response.getStatusCode(), 
receiver);
+
+        } else {
+          SUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - 
startTime));
+          SUCCESS_WRITES.inc(count);
+          VALID_REQUESTS.inc();
+          SUCCESSFUL_WRITE_BATCH_SIZE.update(count);
+          SUCCESSFUL_WRITE_PAYLOAD_SIZE.update(bufferSize);
+
+          LOG.debug("Successfully wrote {} events", count);
+        }
+
+      } catch (HttpResponseException e) {
+        UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - 
startTime));
+        FAILED_WRITES.inc(count);
+        int statusCode = e.getStatusCode();
+        if (statusCode >= 400 && statusCode < 500) {
+          INVALID_REQUESTS.inc();
+        } else if (statusCode >= 500 && statusCode < 600) {
+          SERVER_ERROR_REQUESTS.inc();
+        }
+
+        logWriteFailures(count, e.getStatusCode(), e.getContent(), 
e.getStatusMessage());
+        flushWriteFailures(events, e.getStatusMessage(), e.getStatusCode(), 
receiver);
+
+      } catch (IOException ioe) {
+        UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - 
startTime));
+        FAILED_WRITES.inc(count);
+        INVALID_REQUESTS.inc();
+
+        logWriteFailures(count, 0, ioe.getMessage(), null);
+        flushWriteFailures(events, ioe.getMessage(), null, receiver);
+
+      } finally {
+        // States are cleared regardless of write success or failure since we
+        // write failed events to an output PCollection.
+        bufferState.clear();
+        countState.clear();
+        bufferSizeState.clear();
+
+        // We've observed cases where errors at this point can cause the 
pipeline to keep retrying
+        // the same events over and over (e.g. from Dataflow Runner's Pub/Sub 
implementation). Since
+        // the events have either been published or wrapped for error 
handling, we can safely
+        // ignore this error, though there may or may not be a leak of some 
type depending on
+        // HttpResponse's implementation. However, any potential leak would 
still happen if we let
+        // the exception fall through, so this isn't considered a major issue.
+        try {
+          if (response != null) {
+            response.ignore();
+          }
+        } catch (IOException e) {
+          LOG.warn(
+              "Error ignoring response from Datadog. Messages should still 
have published, but there"
+                  + " might be a connection leak.",
+              e);
+        }
+      }
+    }
+  }
+
+  /** Utility method to log write failures. */
+  private void logWriteFailures(
+      long count, int statusCode, @Nullable String content, @Nullable String 
statusMessage) {
+    LOG.error("Failed to write {} events", count);
+    LOG.error(
+        "Error writing to Datadog. StatusCode: {}, content: {}, StatusMessage: 
{}",
+        statusCode,
+        content,
+        statusMessage);
+  }
+
+  /**
+   * Utility method to un-batch and flush failed write events.
+   *
+   * @param events List of {@link DatadogEvent}s to un-batch
+   * @param statusMessage Status message to be added to {@link 
DatadogWriteError}
+   * @param statusCode Status code to be added to {@link DatadogWriteError}
+   * @param receiver Receiver to write {@link DatadogWriteError}s to
+   */
+  private void flushWriteFailures(
+      List<DatadogEvent> events,
+      @Nullable String statusMessage,
+      @Nullable Integer statusCode,
+      OutputReceiver<DatadogWriteError> receiver) {
+
+    checkNotNull(events, "DatadogEvents cannot be null.");
+
+    DatadogWriteError.Builder builder = DatadogWriteError.newBuilder();
+
+    if (statusMessage != null) {
+      builder.withStatusMessage(statusMessage);
+    }
+
+    if (statusCode != null) {
+      builder.withStatusCode(statusCode);
+    }
+
+    for (DatadogEvent event : events) {
+      String payload = DatadogEventSerializer.getPayloadString(event);
+      DatadogWriteError error = builder.withPayload(payload).build();
+      receiver.output(error);
+    }
+  }
+
+  /**
+   * Checks whether the Logs API URL matches the format PROTOCOL://HOST[:PORT].
+   *
+   * @param url for Logs API
+   * @return true if the URL is valid
+   */
+  private static boolean isValidUrlFormat(@Nullable String url) {
+    if (url == null) {
+      return false;
+    }
+    Matcher matcher = URL_PATTERN.matcher(url);
+    if (matcher.find()) {
+      String host = matcher.group(2);
+      if (host == null) {
+        return false;
+      }
+      return InetAddresses.isInetAddress(host) || 
InternetDomainName.isValid(host);
+    }
+    return false;
+  }
+
+  /**
+   * Converts Nanoseconds to Milliseconds.
+   *
+   * @param ns time in nanoseconds
+   * @return time in milliseconds
+   */
+  private static long nanosToMillis(long ns) {
+    return Math.round(((double) ns) / 1e6);
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+
+    abstract Builder setUrl(String url);
+
+    abstract String url();
+
+    abstract Builder setApiKey(String apiKey);
+
+    abstract String apiKey();
+
+    abstract Builder setMinBatchCount(Integer minBatchCount);
+
+    abstract Integer minBatchCount();
+
+    abstract Builder setInputBatchCount(@Nullable Integer inputBatchCount);
+
+    abstract Builder setMaxBufferSize(Long maxBufferSize);
+
+    abstract DatadogEventWriter autoBuild();
+
+    /**
+     * Method to set the url for Logs API.
+     *
+     * @param url for Logs API
+     * @return {@link Builder}
+     */
+    public Builder withUrl(String url) {
+      checkArgument(url != null, "withURL(url) called with null input.");
+      checkArgument(isValidUrlFormat(url), INVALID_URL_FORMAT_MESSAGE);
+      return setUrl(url);
+    }
+
+    /**
+     * Method to set the API key for Logs API.
+     *
+     * @param apiKey API key for Logs API
+     * @return {@link Builder}
+     */
+    public Builder withApiKey(String apiKey) {
+      checkArgument(apiKey != null, "withApiKey(apiKey) called with null 
input.");
+      return setApiKey(apiKey);
+    }
+
+    /**
+     * Method to set the inputBatchCount.
+     *
+     * @param inputBatchCount for batching post requests.
+     * @return {@link Builder}
+     */
+    public Builder withInputBatchCount(@Nullable Integer inputBatchCount) {
+      if (inputBatchCount != null) {
+        checkArgument(
+            inputBatchCount >= MoreObjects.firstNonNull(minBatchCount(), 
MIN_BATCH_COUNT),
+            "inputBatchCount must be greater than or equal to %s",
+            minBatchCount());
+        checkArgument(
+            inputBatchCount <= MAX_BATCH_COUNT,
+            "inputBatchCount must be less than or equal to %s",
+            MAX_BATCH_COUNT);
+      }
+      return setInputBatchCount(inputBatchCount);
+    }
+
+    /**
+     * Method to set the maxBufferSize.
+     *
+     * @param maxBufferSize for batching post requests.
+     * @return {@link Builder}
+     */
+    public Builder withMaxBufferSize(@Nullable Long maxBufferSize) {
+      if (maxBufferSize == null) {
+        return setMaxBufferSize(MAX_BUFFER_SIZE);
+      }
+      return setMaxBufferSize(maxBufferSize);
+    }
+
+    /** Build a new {@link DatadogEventWriter} objects based on the 
configuration. */
+    public DatadogEventWriter build() {
+      checkNotNull(url(), "url needs to be provided.");
+      checkNotNull(apiKey(), "apiKey needs to be provided.");
+
+      return autoBuild();
+    }
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogIO.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogIO.java
new file mode 100644
index 00000000000..fa8b6befaba
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogIO.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DatadogIO} class provides a {@link PTransform} that allows 
writing {@link
+ * DatadogEvent} messages into a Datadog Logs API end point.
+ */
+public class DatadogIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DatadogIO.class);
+
+  private DatadogIO() {}
+
+  public static Write.Builder writeBuilder() {
+    return writeBuilder(null);
+  }
+
+  public static Write.Builder writeBuilder(@Nullable Integer minBatchCount) {
+    return new 
AutoValue_DatadogIO_Write.Builder().setMinBatchCount(minBatchCount);
+  }
+
+  /**
+   * Class {@link Write} provides a {@link PTransform} that allows writing 
{@link DatadogEvent}
+   * records into a Datadog Logs API end-point using HTTP POST requests. In 
the event of an error, a
+   * {@link PCollection} of {@link DatadogWriteError} records are returned for 
further processing or
+   * storing into a deadletter sink.
+   */
+  @AutoValue
+  public abstract static class Write
+      extends PTransform<PCollection<DatadogEvent>, 
PCollection<DatadogWriteError>> {
+
+    abstract String url();
+
+    abstract String apiKey();
+
+    @Nullable
+    abstract Integer minBatchCount();
+
+    @Nullable
+    abstract Integer batchCount();
+
+    @Nullable
+    abstract Long maxBufferSize();
+
+    @Nullable
+    abstract Integer parallelism();
+
+    @Override
+    public PCollection<DatadogWriteError> expand(PCollection<DatadogEvent> 
input) {
+
+      LOG.info("Configuring DatadogEventWriter.");
+      DatadogEventWriter.Builder builder =
+          DatadogEventWriter.newBuilder(minBatchCount())
+              .withMaxBufferSize(maxBufferSize())
+              .withUrl(url())
+              .withInputBatchCount(batchCount())
+              .withApiKey(apiKey());
+
+      DatadogEventWriter writer = builder.build();
+      LOG.info("DatadogEventWriter configured");
+
+      // Return a PCollection<DatadogWriteError>
+      return input
+          .apply("Create KV pairs", CreateKeys.of(parallelism()))
+          .apply("Write Datadog events", ParDo.of(writer))
+          .setCoder(DatadogWriteErrorCoder.of());
+    }
+
+    /** A builder for creating {@link Write} objects. */
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      abstract Builder setUrl(String url);
+
+      abstract String url();
+
+      abstract Builder setApiKey(String apiKey);
+
+      abstract String apiKey();
+
+      abstract Builder setMinBatchCount(@Nullable Integer minBatchCount);
+
+      abstract Builder setBatchCount(Integer batchCount);
+
+      abstract Builder setMaxBufferSize(Long maxBufferSize);
+
+      abstract Builder setParallelism(Integer parallelism);
+
+      abstract Write autoBuild();
+
+      /**
+       * Method to set the url for Logs API.
+       *
+       * @param url for Logs API
+       * @return {@link Builder}
+       */
+      public Builder withUrl(String url) {
+        checkArgument(url != null, "withURL(url) called with null input.");
+        return setUrl(url);
+      }
+
+      /**
+       * Method to set the API key for Logs API.
+       *
+       * @param apiKey API key for Logs API
+       * @return {@link Builder}
+       */
+      public Builder withApiKey(String apiKey) {
+        checkArgument(apiKey != null, "withApiKey(apiKey) called with null 
input.");
+        return setApiKey(apiKey);
+      }
+
+      /**
+       * Method to set the Batch Count.
+       *
+       * @param batchCount for batching post requests.
+       * @return {@link Builder}
+       */
+      public Builder withBatchCount(Integer batchCount) {
+        checkArgument(batchCount != null, "withBatchCount(batchCount) called 
with null input.");
+        return setBatchCount(batchCount);
+      }
+
+      /**
+       * Method to set the Max Buffer Size.
+       *
+       * @param maxBufferSize for batching post requests.
+       * @return {@link Builder}
+       */
+      public Builder withMaxBufferSize(Long maxBufferSize) {
+        checkArgument(
+            maxBufferSize != null, "withMaxBufferSize(maxBufferSize) called 
with null input.");
+        return setMaxBufferSize(maxBufferSize);
+      }
+
+      /**
+       * Method to set the parallelism.
+       *
+       * @param parallelism for controlling the number of http client 
connections.
+       * @return {@link Builder}
+       */
+      public Builder withParallelism(Integer parallelism) {
+        checkArgument(parallelism != null, "withParallelism(parallelism) 
called with null input.");
+        return setParallelism(parallelism);
+      }
+
+      public Write build() {
+        checkNotNull(url(), "Logs API url is required.");
+        checkNotNull(apiKey(), "API key is required.");
+
+        return autoBuild();
+      }
+    }
+
+    private static class CreateKeys
+        extends PTransform<PCollection<DatadogEvent>, PCollection<KV<Integer, 
DatadogEvent>>> {
+
+      private static final Integer DEFAULT_PARALLELISM = 1;
+
+      @Nullable private Integer requestedKeys;
+
+      private CreateKeys(@Nullable Integer requestedKeys) {
+        this.requestedKeys = requestedKeys;
+      }
+
+      static CreateKeys of(@Nullable Integer requestedKeys) {
+        return new CreateKeys(requestedKeys);
+      }
+
+      @Override
+      public PCollection<KV<Integer, DatadogEvent>> 
expand(PCollection<DatadogEvent> input) {
+
+        return input
+            .apply("Inject Keys", ParDo.of(new 
CreateKeysFn(this.requestedKeys)))
+            .setCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
DatadogEventCoder.of()));
+      }
+
+      private static class CreateKeysFn extends DoFn<DatadogEvent, KV<Integer, 
DatadogEvent>> {
+
+        @Nullable private Integer specifiedParallelism;
+        private Integer calculatedParallelism;
+
+        CreateKeysFn(@Nullable Integer specifiedParallelism) {
+          this.specifiedParallelism = specifiedParallelism;
+          this.calculatedParallelism =
+              MoreObjects.firstNonNull(specifiedParallelism, 
DEFAULT_PARALLELISM);
+          LOG.info("Parallelism set to: {}", calculatedParallelism);
+        }
+
+        @Setup
+        public void setup() {
+          // Initialization is now in the constructor to satisfy static 
analysis.
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext context) {
+          context.output(
+              
KV.of(ThreadLocalRandom.current().nextInt(calculatedParallelism), 
context.element()));
+        }
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteError.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteError.java
new file mode 100644
index 00000000000..977873718c6
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteError.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+
+/** A class for capturing errors writing {@link DatadogEvent}s to Datadog's 
Logs API. */
+@AutoValue
+public abstract class DatadogWriteError {
+
+  public static Builder newBuilder() {
+    return new AutoValue_DatadogWriteError.Builder();
+  }
+
+  @Nullable
+  public abstract Integer statusCode();
+
+  @Nullable
+  public abstract String statusMessage();
+
+  @Nullable
+  public abstract String payload();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+
+    abstract Builder setStatusCode(Integer statusCode);
+
+    abstract Integer statusCode();
+
+    abstract Builder setStatusMessage(String statusMessage);
+
+    abstract Builder setPayload(String payload);
+
+    abstract DatadogWriteError autoBuild();
+
+    public Builder withStatusCode(Integer statusCode) {
+      checkNotNull(statusCode, "withStatusCode(statusCode) called with null 
input.");
+
+      return setStatusCode(statusCode);
+    }
+
+    public Builder withStatusMessage(String statusMessage) {
+      checkNotNull(statusMessage, "withStatusMessage(statusMessage) called 
with null input.");
+
+      return setStatusMessage(statusMessage);
+    }
+
+    public Builder withPayload(String payload) {
+      checkNotNull(payload, "withPayload(payload) called with null input.");
+
+      return setPayload(payload);
+    }
+
+    public DatadogWriteError build() {
+      return autoBuild();
+    }
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoder.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoder.java
new file mode 100644
index 00000000000..a634c798518
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A {@link org.apache.beam.sdk.coders.Coder} for {@link DatadogWriteError} 
objects. */
+public class DatadogWriteErrorCoder extends AtomicCoder<DatadogWriteError> {
+
+  private static final DatadogWriteErrorCoder DATADOG_WRITE_ERROR_CODER =
+      new DatadogWriteErrorCoder();
+
+  private static final TypeDescriptor<DatadogWriteError> TYPE_DESCRIPTOR =
+      new TypeDescriptor<DatadogWriteError>() {};
+  private static final StringUtf8Coder STRING_UTF_8_CODER = 
StringUtf8Coder.of();
+  private static final NullableCoder<String> STRING_NULLABLE_CODER =
+      NullableCoder.of(STRING_UTF_8_CODER);
+  private static final NullableCoder<Integer> INTEGER_NULLABLE_CODER =
+      NullableCoder.of(BigEndianIntegerCoder.of());
+
+  public static DatadogWriteErrorCoder of() {
+    return DATADOG_WRITE_ERROR_CODER;
+  }
+
+  @Override
+  public void encode(DatadogWriteError value, OutputStream out) throws 
CoderException, IOException {
+    INTEGER_NULLABLE_CODER.encode(value.statusCode(), out);
+    STRING_NULLABLE_CODER.encode(value.statusMessage(), out);
+    STRING_NULLABLE_CODER.encode(value.payload(), out);
+  }
+
+  @Override
+  public DatadogWriteError decode(InputStream in) throws CoderException, 
IOException {
+
+    DatadogWriteError.Builder builder = DatadogWriteError.newBuilder();
+
+    Integer statusCode = INTEGER_NULLABLE_CODER.decode(in);
+    if (statusCode != null) {
+      builder.withStatusCode(statusCode);
+    }
+
+    String statusMessage = STRING_NULLABLE_CODER.decode(in);
+    if (statusMessage != null) {
+      builder.withStatusMessage(statusMessage);
+    }
+
+    String payload = STRING_NULLABLE_CODER.decode(in);
+    if (payload != null) {
+      builder.withPayload(payload);
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  public TypeDescriptor<DatadogWriteError> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    throw new NonDeterministicException(
+        this, "DatadogWriteError can hold arbitrary instances, which may be 
non-deterministic.");
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/package-info.java
 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/package-info.java
new file mode 100644
index 00000000000..fbeed9f1a55
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for writing to <a href="https://www.datadoghq.com/";>Datadog</a>.
+ *
+ * <p>The {@link org.apache.beam.sdk.io.datadog.DatadogIO} class provides a 
{@link
+ * org.apache.beam.sdk.transforms.PTransform} that allows writing data to the 
Datadog Logs API.
+ *
+ * <p>For more information on the Datadog Logs API, see the <a
+ * href="https://docs.datadoghq.com/api/latest/logs/";>official 
documentation</a>.
+ */
+package org.apache.beam.sdk.io.datadog;
diff --git 
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventCoderTest.java
 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventCoderTest.java
new file mode 100644
index 00000000000..f1dad0784af
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventCoderTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.junit.Test;
+
+/** Unit tests for {@link com.google.cloud.teleport.datadog.DatadogEventCoder} 
class. */
+public class DatadogEventCoderTest {
+
+  /**
+   * Test whether {@link DatadogEventCoder} is able to encode/decode a {@link 
DatadogEvent}
+   * correctly.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testEncodeDecode() throws IOException {
+
+    String source = "test-source";
+    String tags = "test-tags";
+    String hostname = "test-hostname";
+    String service = "test-service";
+    String message = "test-message";
+
+    DatadogEvent actualEvent =
+        DatadogEvent.newBuilder()
+            .withSource(source)
+            .withTags(tags)
+            .withHostname(hostname)
+            .withService(service)
+            .withMessage(message)
+            .build();
+
+    DatadogEventCoder coder = DatadogEventCoder.of();
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+      coder.encode(actualEvent, bos);
+      try (ByteArrayInputStream bin = new 
ByteArrayInputStream(bos.toByteArray())) {
+        DatadogEvent decodedEvent = coder.decode(bin);
+        assertThat(decodedEvent, is(equalTo(actualEvent)));
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisherTest.java
 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisherTest.java
new file mode 100644
index 00000000000..17f6e7a6e15
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisherTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpContent;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.util.ExponentialBackOff;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.mockserver.configuration.ConfigurationProperties;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.MediaType;
+import org.mockserver.verify.VerificationTimes;
+
+/** Unit tests for {@link DatadogEventPublisher} class. */
+public class DatadogEventPublisherTest {
+
+  private static final String EXPECTED_PATH = "/" + 
DatadogEventPublisher.DD_URL_PATH;
+
+  private static final DatadogEvent DATADOG_TEST_EVENT_1 =
+      DatadogEvent.newBuilder()
+          .withSource("test-source-1")
+          .withTags("test-tags-1")
+          .withHostname("test-hostname-1")
+          .withService("test-service-1")
+          .withMessage("test-message-1")
+          .build();
+
+  private static final DatadogEvent DATADOG_TEST_EVENT_2 =
+      DatadogEvent.newBuilder()
+          .withSource("test-source-2")
+          .withTags("test-tags-2")
+          .withHostname("test-hostname-2")
+          .withService("test-service-2")
+          .withMessage("test-message-2")
+          .build();
+
+  private static final List<DatadogEvent> DATADOG_EVENTS =
+      ImmutableList.of(DATADOG_TEST_EVENT_1, DATADOG_TEST_EVENT_2);
+
+  /** Test whether {@link HttpContent} is created from the list of {@link 
DatadogEvent}s. */
+  @Test
+  public void contentTest() throws NoSuchAlgorithmException, 
KeyManagementException, IOException {
+
+    DatadogEventPublisher publisher =
+        DatadogEventPublisher.newBuilder()
+            .withUrl("http://example.com";)
+            .withApiKey("test-api-key")
+            .build();
+
+    String expectedString =
+        "["
+            + "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\","
+            + 
"\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\","
+            + "\"message\":\"test-message-1\"},"
+            + "{\"ddsource\":\"test-source-2\",\"ddtags\":\"test-tags-2\","
+            + 
"\"hostname\":\"test-hostname-2\",\"service\":\"test-service-2\","
+            + "\"message\":\"test-message-2\"}"
+            + "]";
+
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+      HttpContent actualContent = publisher.getContent(DATADOG_EVENTS);
+      actualContent.writeTo(bos);
+      String actualString = new String(bos.toByteArray(), 
StandardCharsets.UTF_8);
+      assertThat(actualString, is(equalTo(expectedString)));
+    }
+  }
+
+  @Test
+  public void genericURLTest() throws IOException {
+
+    String baseURL = "http://example.com";;
+    DatadogEventPublisher.Builder builder =
+        
DatadogEventPublisher.newBuilder().withUrl(baseURL).withApiKey("test-api-key");
+
+    assertThat(
+        builder.genericUrl(),
+        is(equalTo(new GenericUrl(Joiner.on('/').join(baseURL, 
"api/v2/logs")))));
+  }
+
+  @Test
+  public void configureBackOffDefaultTest()
+      throws NoSuchAlgorithmException, KeyManagementException, IOException {
+
+    DatadogEventPublisher publisherDefaultBackOff =
+        DatadogEventPublisher.newBuilder()
+            .withUrl("http://example.com";)
+            .withApiKey("test-api-key")
+            .build();
+
+    assertThat(
+        
publisherDefaultBackOff.getConfiguredBackOff().getMaxElapsedTimeMillis(),
+        is(equalTo(ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS)));
+  }
+
+  @Test
+  public void configureBackOffCustomTest()
+      throws NoSuchAlgorithmException, KeyManagementException, IOException {
+
+    int timeoutInMillis = 600000; // 10 minutes
+    DatadogEventPublisher publisherWithBackOff =
+        DatadogEventPublisher.newBuilder()
+            .withUrl("http://example.com";)
+            .withApiKey("test-api-key")
+            .withMaxElapsedMillis(timeoutInMillis)
+            .build();
+
+    assertThat(
+        publisherWithBackOff.getConfiguredBackOff().getMaxElapsedTimeMillis(),
+        is(equalTo(timeoutInMillis)));
+  }
+
+  @Test
+  public void requestHeadersTest() throws Exception {
+    ConfigurationProperties.disableSystemOut(true);
+    try (ClientAndServer mockServer = startClientAndServer()) {
+      mockServer
+          .when(org.mockserver.model.HttpRequest.request(EXPECTED_PATH))
+          
.respond(org.mockserver.model.HttpResponse.response().withStatusCode(202));
+
+      DatadogEventPublisher publisher =
+          DatadogEventPublisher.newBuilder()
+              .withUrl(Joiner.on(':').join("http://localhost";, 
mockServer.getPort()))
+              .withApiKey("test-api-key")
+              .build();
+
+      DatadogEvent event =
+          DatadogEvent.newBuilder()
+              .withSource("test-source-1")
+              .withTags("test-tags-1")
+              .withHostname("test-hostname-1")
+              .withService("test-service-1")
+              .withMessage("test-message-1")
+              .build();
+
+      HttpResponse response = publisher.execute(ImmutableList.of(event));
+      assertThat(response.getStatusCode(), is(equalTo(202)));
+
+      mockServer.verify(
+          org.mockserver.model.HttpRequest.request(EXPECTED_PATH)
+              .withContentType(MediaType.APPLICATION_JSON)
+              .withHeader("dd-api-key", "test-api-key")
+              .withHeader("dd-evp-origin", "dataflow")
+              .withHeader("Accept-Encoding", "gzip"),
+          VerificationTimes.once());
+    }
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializerTest.java
 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializerTest.java
new file mode 100644
index 00000000000..15b127da2f0
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.List;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+public class DatadogEventSerializerTest {
+
+  private static final DatadogEvent DATADOG_TEST_EVENT_1 =
+      DatadogEvent.newBuilder()
+          .withSource("test-source-1")
+          .withTags("test-tags-1")
+          .withHostname("test-hostname-1")
+          .withService("test-service-1")
+          .withMessage("test-message-1")
+          .build();
+
+  private static final DatadogEvent DATADOG_TEST_EVENT_2 =
+      DatadogEvent.newBuilder()
+          .withSource("test-source-2")
+          .withTags("test-tags-2")
+          .withHostname("test-hostname-2")
+          .withService("test-service-2")
+          .withMessage("test-message-2")
+          .build();
+
+  private static final List<DatadogEvent> DATADOG_EVENTS =
+      ImmutableList.of(DATADOG_TEST_EVENT_1, DATADOG_TEST_EVENT_2);
+
+  /** Test whether payload is stringified as expected. */
+  @Test
+  public void stringPayloadTest_list() {
+    String actual = DatadogEventSerializer.getPayloadString(DATADOG_EVENTS);
+
+    String expected =
+        "["
+            + "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\","
+            + 
"\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\","
+            + "\"message\":\"test-message-1\"},"
+            + "{\"ddsource\":\"test-source-2\",\"ddtags\":\"test-tags-2\","
+            + 
"\"hostname\":\"test-hostname-2\",\"service\":\"test-service-2\","
+            + "\"message\":\"test-message-2\"}"
+            + "]";
+
+    assertThat(expected, is(equalTo(actual)));
+  }
+
+  /** Test whether payload is stringified as expected. */
+  @Test
+  public void stringPayloadTest_single() {
+    String actual = 
DatadogEventSerializer.getPayloadString(DATADOG_TEST_EVENT_1);
+
+    String expected =
+        "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\","
+            + 
"\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\","
+            + "\"message\":\"test-message-1\"}";
+
+    assertThat(expected, is(equalTo(actual)));
+  }
+
+  /** Test payload size calculation for a payload string. */
+  @Test
+  public void stringPayloadSizeTest() {
+    long actual =
+        DatadogEventSerializer.getPayloadSize(
+            "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\","
+                + 
"\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\","
+                + "\"message\":\"test-message-1\"}");
+
+    long expected = 134L;
+
+    assertThat(expected, is(equalTo(actual)));
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventTest.java
 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventTest.java
new file mode 100644
index 00000000000..de1759faafb
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.junit.Test;
+
+/** Unit tests for {@link DatadogEvent} class. */
+public class DatadogEventTest {
+
+  /** Test whether a {@link DatadogEvent} created via its builder can be 
compared correctly. */
+  @Test
+  public void testEquals() {
+    String source = "test-source";
+    String tags = "test-tags";
+    String hostname = "test-hostname";
+    String service = "test-service";
+    String message = "test-message";
+
+    DatadogEvent actualEvent =
+        DatadogEvent.newBuilder()
+            .withSource(source)
+            .withTags(tags)
+            .withHostname(hostname)
+            .withService(service)
+            .withMessage(message)
+            .build();
+
+    assertThat(
+        actualEvent,
+        is(
+            equalTo(
+                DatadogEvent.newBuilder()
+                    .withSource(source)
+                    .withTags(tags)
+                    .withHostname(hostname)
+                    .withService(service)
+                    .withMessage(message)
+                    .build())));
+
+    assertThat(
+        actualEvent,
+        is(
+            not(
+                equalTo(
+                    DatadogEvent.newBuilder()
+                        .withSource(source)
+                        .withTags(tags)
+                        .withHostname(hostname)
+                        .withService(service)
+                        .withMessage("a-different-test-message")
+                        .build()))));
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventWriterTest.java
 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventWriterTest.java
new file mode 100644
index 00000000000..086bb93f53e
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventWriterTest.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockserver.configuration.ConfigurationProperties;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.matchers.Times;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+
+/** Unit tests for {@link 
com.google.cloud.teleport.datadog.DatadogEventWriter} class. */
+public class DatadogEventWriterTest {
+
+  private static final String EXPECTED_PATH = "/" + 
DatadogEventPublisher.DD_URL_PATH;
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  // We create a MockServerRule to simulate an actual Datadog API server.
+  private ClientAndServer mockServer;
+
+  @Before
+  public void setup() {
+    ConfigurationProperties.disableSystemOut(true);
+    mockServer = startClientAndServer();
+  }
+
+  @After
+  public void tearDown() {
+    if (mockServer != null) {
+      mockServer.stop();
+    }
+  }
+
+  /** Test building {@link DatadogEventWriter} with missing URL. */
+  @Test
+  public void eventWriterMissingURL() {
+
+    Exception thrown =
+        assertThrows(NullPointerException.class, () -> 
DatadogEventWriter.newBuilder().build());
+
+    assertThat(thrown).hasMessageThat().contains("url needs to be provided");
+  }
+
+  /** Test building {@link DatadogEventWriter} with missing URL protocol. */
+  @Test
+  public void eventWriterMissingURLProtocol() {
+
+    Exception thrown =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> DatadogEventWriter.newBuilder().withUrl("test-url").build());
+
+    
assertThat(thrown).hasMessageThat().contains(DatadogEventWriter.INVALID_URL_FORMAT_MESSAGE);
+  }
+
+  /** Test building {@link DatadogEventWriter} with an invalid URL. */
+  @Test
+  public void eventWriterInvalidURL() {
+
+    Exception thrown =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> 
DatadogEventWriter.newBuilder().withUrl("http://1.2.3";).build());
+
+    
assertThat(thrown).hasMessageThat().contains(DatadogEventWriter.INVALID_URL_FORMAT_MESSAGE);
+  }
+
+  /** Test building {@link DatadogEventWriter} with the 'api/v2/logs' path 
appended to the URL. */
+  @Test
+  public void eventWriterFullEndpoint() {
+
+    Exception thrown =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                DatadogEventWriter.newBuilder()
+                    .withUrl("http://test-url:8088/api/v2/logs";)
+                    .build());
+
+    
assertThat(thrown).hasMessageThat().contains(DatadogEventWriter.INVALID_URL_FORMAT_MESSAGE);
+  }
+
+  /** Test building {@link DatadogEventWriter} with missing token. */
+  @Test
+  public void eventWriterMissingToken() {
+
+    Exception thrown =
+        assertThrows(
+            NullPointerException.class,
+            () -> 
DatadogEventWriter.newBuilder().withUrl("http://test-url";).build());
+
+    assertThat(thrown).hasMessageThat().contains("apiKey needs to be 
provided");
+  }
+
+  /** Test building {@link DatadogEventWriter} with default batch count. */
+  @Test
+  public void eventWriterDefaultBatchCount() {
+
+    DatadogEventWriter writer =
+        DatadogEventWriter.newBuilder()
+            .withUrl("http://test-url";)
+            .withApiKey("test-api-key")
+            .build();
+
+    assertThat(writer.inputBatchCount()).isNull();
+  }
+
+  /**
+   * Test building {@link DatadogEventWriter} with a batchCount less than the 
configured minimum.
+   */
+  @Test
+  public void eventWriterBatchCountTooSmall() {
+
+    Exception thrown =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                DatadogEventWriter.newBuilder(7)
+                    .withUrl("http://test-url";)
+                    .withApiKey("test-api-key")
+                    .withInputBatchCount(6)
+                    .build());
+
+    assertThat(thrown)
+        .hasMessageThat()
+        .contains("inputBatchCount must be greater than or equal to 7");
+  }
+
+  /** Test building {@link DatadogEventWriter} with a batchCount greater than 
1000. */
+  @Test
+  public void eventWriterBatchCountTooBig() {
+
+    Exception thrown =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                DatadogEventWriter.newBuilder()
+                    .withUrl("http://test-url";)
+                    .withApiKey("test-api-key")
+                    .withInputBatchCount(1001)
+                    .build());
+
+    assertThat(thrown)
+        .hasMessageThat()
+        .contains("inputBatchCount must be less than or equal to 1000");
+  }
+
+  /** Test building {@link DatadogEventWriter} with custom batchCount . */
+  @Test
+  public void eventWriterCustomBatchCountAndValidation() {
+
+    Integer batchCount = 30;
+    DatadogEventWriter writer =
+        DatadogEventWriter.newBuilder()
+            .withUrl("http://test-url";)
+            .withApiKey("test-api-key")
+            .withInputBatchCount(batchCount)
+            .build();
+
+    assertThat(writer.inputBatchCount()).isEqualTo(batchCount);
+  }
+
+  /** Test building {@link DatadogEventWriter} with default maxBufferSize . */
+  @Test
+  public void eventWriterDefaultMaxBufferSize() {
+
+    DatadogEventWriter writer =
+        DatadogEventWriter.newBuilder()
+            .withUrl("http://test-url";)
+            .withApiKey("test-api-key")
+            .build();
+
+    assertThat(writer.maxBufferSize()).isNull();
+  }
+
+  /** Test building {@link DatadogEventWriter} with custom maxBufferSize . */
+  @Test
+  public void eventWriterCustomMaxBufferSizeAndValidation() {
+
+    Long maxBufferSize = 1_427_841L;
+    DatadogEventWriter writer =
+        DatadogEventWriter.newBuilder()
+            .withUrl("http://test-url";)
+            .withMaxBufferSize(maxBufferSize)
+            .withApiKey("test-api-key")
+            .build();
+
+    assertThat(writer.maxBufferSize()).isEqualTo(maxBufferSize);
+  }
+
+  /** Test successful POST request for single batch. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void successfulDatadogWriteSingleBatchTest() {
+
+    // Create server expectation for success.
+    addRequestExpectation(202);
+
+    int testPort = mockServer.getPort();
+
+    List<KV<Integer, DatadogEvent>> testEvents =
+        ImmutableList.of(
+            KV.of(
+                123,
+                DatadogEvent.newBuilder()
+                    .withSource("test-source-1")
+                    .withTags("test-tags-1")
+                    .withHostname("test-hostname-1")
+                    .withService("test-service-1")
+                    .withMessage("test-message-1")
+                    .build()),
+            KV.of(
+                123,
+                DatadogEvent.newBuilder()
+                    .withSource("test-source-2")
+                    .withTags("test-tags-2")
+                    .withHostname("test-hostname-2")
+                    .withService("test-service-2")
+                    .withMessage("test-message-2")
+                    .build()));
+
+    PCollection<DatadogWriteError> actual =
+        pipeline
+            .apply(
+                "Create Input data",
+                Create.of(testEvents)
+                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
DatadogEventCoder.of())))
+            .apply(
+                "DatadogEventWriter",
+                ParDo.of(
+                    DatadogEventWriter.newBuilder(1)
+                        .withUrl(Joiner.on(':').join("http://localhost";, 
testPort))
+                        .withInputBatchCount(1) // Test one request per 
DatadogEvent
+                        .withApiKey("test-api-key")
+                        .build()))
+            .setCoder(DatadogWriteErrorCoder.of());
+
+    // All successful responses.
+    PAssert.that(actual).empty();
+
+    pipeline.run();
+
+    // Server received exactly the expected number of POST requests.
+    mockServer.verify(
+        HttpRequest.request(EXPECTED_PATH), 
VerificationTimes.exactly(testEvents.size()));
+  }
+
+  /** Test successful POST request for multi batch. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void successfulDatadogWriteMultiBatchTest() {
+
+    // Create server expectation for success.
+    addRequestExpectation(202);
+
+    int testPort = mockServer.getPort();
+
+    List<KV<Integer, DatadogEvent>> testEvents =
+        ImmutableList.of(
+            KV.of(
+                123,
+                DatadogEvent.newBuilder()
+                    .withSource("test-source-1")
+                    .withTags("test-tags-1")
+                    .withHostname("test-hostname-1")
+                    .withService("test-service-1")
+                    .withMessage("test-message-1")
+                    .build()),
+            KV.of(
+                123,
+                DatadogEvent.newBuilder()
+                    .withSource("test-source-2")
+                    .withTags("test-tags-2")
+                    .withHostname("test-hostname-2")
+                    .withService("test-service-2")
+                    .withMessage("test-message-2")
+                    .build()));
+
+    PCollection<DatadogWriteError> actual =
+        pipeline
+            .apply(
+                "Create Input data",
+                Create.of(testEvents)
+                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
DatadogEventCoder.of())))
+            .apply(
+                "DatadogEventWriter",
+                ParDo.of(
+                    DatadogEventWriter.newBuilder(1)
+                        .withUrl(Joiner.on(':').join("http://localhost";, 
testPort))
+                        .withInputBatchCount(testEvents.size()) // all 
requests in a single batch.
+                        .withApiKey("test-api-key")
+                        .build()))
+            .setCoder(DatadogWriteErrorCoder.of());
+
+    // All successful responses.
+    PAssert.that(actual).empty();
+
+    pipeline.run();
+
+    // Server received exactly one POST request.
+    mockServer.verify(HttpRequest.request(EXPECTED_PATH), 
VerificationTimes.once());
+  }
+
+  /** Test successful POST requests for batch exceeding max buffer size. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void successfulDatadogWriteExceedingMaxBufferSize() {
+
+    // Create server expectation for success.
+    addRequestExpectation(202);
+
+    int testPort = mockServer.getPort();
+
+    String payloadFormat = "{\"message\":\"%s\"}";
+    long jsonSize = 
DatadogEventSerializer.getPayloadSize(String.format(payloadFormat, ""));
+
+    long maxBufferSize = 100;
+    long msgSize = 50;
+
+    char[] bunchOfAs = new char[(int) (msgSize - jsonSize)];
+    Arrays.fill(bunchOfAs, 'a');
+
+    List<KV<Integer, DatadogEvent>> testEvents = new ArrayList<>();
+    for (int i = 1; i <= 3; i++) {
+      testEvents.add(
+          KV.of(123, DatadogEvent.newBuilder().withMessage(new 
String(bunchOfAs)).build()));
+    }
+
+    PCollection<DatadogWriteError> actual =
+        pipeline
+            .apply(
+                "Create Input data",
+                Create.of(testEvents)
+                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
DatadogEventCoder.of())))
+            .apply(
+                "DatadogEventWriter",
+                ParDo.of(
+                    DatadogEventWriter.newBuilder(1)
+                        .withUrl(Joiner.on(':').join("http://localhost";, 
testPort))
+                        .withInputBatchCount(testEvents.size())
+                        .withMaxBufferSize(maxBufferSize)
+                        .withApiKey("test-api-key")
+                        .build()))
+            .setCoder(DatadogWriteErrorCoder.of());
+
+    // All successful responses.
+    PAssert.that(actual).empty();
+
+    pipeline.run();
+
+    // Server received exactly two POST requests:
+    // 1st batch of size=2 due to next msg exceeding max buffer size
+    // 2nd batch of size=1 due to timer
+    mockServer.verify(HttpRequest.request(EXPECTED_PATH), 
VerificationTimes.exactly(2));
+  }
+
+  /** Test failed POST request. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void failedDatadogWriteSingleBatchTest() {
+
+    // Create server expectation for FAILURE.
+    addRequestExpectation(404);
+
+    int testPort = mockServer.getPort();
+
+    List<KV<Integer, DatadogEvent>> testEvents =
+        ImmutableList.of(
+            KV.of(
+                123,
+                DatadogEvent.newBuilder()
+                    .withSource("test-source-1")
+                    .withTags("test-tags-1")
+                    .withHostname("test-hostname-1")
+                    .withService("test-service-1")
+                    .withMessage("test-message-1")
+                    .build()));
+
+    PCollection<DatadogWriteError> actual =
+        pipeline
+            .apply(
+                "Create Input data",
+                Create.of(testEvents)
+                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
DatadogEventCoder.of())))
+            .apply(
+                "DatadogEventWriter",
+                ParDo.of(
+                    DatadogEventWriter.newBuilder(1)
+                        .withUrl(Joiner.on(':').join("http://localhost";, 
testPort))
+                        .withInputBatchCount(testEvents.size()) // all 
requests in a single batch.
+                        .withApiKey("test-api-key")
+                        .build()))
+            .setCoder(DatadogWriteErrorCoder.of());
+
+    // Expect a single 404 Not found DatadogWriteError
+    PAssert.that(actual)
+        .containsInAnyOrder(
+            DatadogWriteError.newBuilder()
+                .withStatusCode(404)
+                .withStatusMessage("Not Found")
+                .withPayload(
+                    "{\"ddsource\":\"test-source-1\","
+                        + 
"\"ddtags\":\"test-tags-1\",\"hostname\":\"test-hostname-1\","
+                        + 
"\"service\":\"test-service-1\",\"message\":\"test-message-1\"}")
+                .build());
+
+    pipeline.run();
+
+    // Server received exactly one POST request.
+    mockServer.verify(HttpRequest.request(EXPECTED_PATH), 
VerificationTimes.once());
+  }
+
+  /** Test failed due to single event exceeding max buffer size. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void failedDatadogEventTooBig() {
+
+    // Create server expectation for FAILURE.
+    addRequestExpectation(404);
+
+    int testPort = mockServer.getPort();
+
+    String payloadFormat = "{\"message\":\"%s\"}";
+
+    long maxBufferSize = 100;
+    char[] bunchOfAs =
+        new char
+            [(int)
+                (maxBufferSize
+                    + 1L
+                    - 
DatadogEventSerializer.getPayloadSize(String.format(payloadFormat, "")))];
+    Arrays.fill(bunchOfAs, 'a');
+    String messageTooBig = new String(bunchOfAs);
+
+    String expectedPayload = String.format(payloadFormat, messageTooBig);
+    long expectedPayloadSize = 
DatadogEventSerializer.getPayloadSize(expectedPayload);
+    assertThat(maxBufferSize + 1L).isEqualTo(expectedPayloadSize);
+
+    List<KV<Integer, DatadogEvent>> testEvents =
+        ImmutableList.of(KV.of(123, 
DatadogEvent.newBuilder().withMessage(messageTooBig).build()));
+
+    PCollection<DatadogWriteError> actual =
+        pipeline
+            .apply(
+                "Create Input data",
+                Create.of(testEvents)
+                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
DatadogEventCoder.of())))
+            .apply(
+                "DatadogEventWriter",
+                ParDo.of(
+                    DatadogEventWriter.newBuilder()
+                        .withUrl(Joiner.on(':').join("http://localhost";, 
testPort))
+                        .withMaxBufferSize(maxBufferSize)
+                        .withApiKey("test-api-key")
+                        .build()))
+            .setCoder(DatadogWriteErrorCoder.of());
+
+    // Expect a single DatadogWriteError due to exceeding max buffer size
+    PAssert.that(actual)
+        
.containsInAnyOrder(DatadogWriteError.newBuilder().withPayload(expectedPayload).build());
+
+    pipeline.run();
+
+    // Server did not receive any requests.
+    mockServer.verify(HttpRequest.request(EXPECTED_PATH), 
VerificationTimes.exactly(0));
+  }
+
+  /** Test retryable POST request. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void retryableDatadogWriteSingleBatchTest() {
+
+    // Create server expectations for 3 retryable failures, 1 success.
+    addRequestExpectation(408, Times.once());
+    addRequestExpectation(429, Times.once());
+    addRequestExpectation(502, Times.once());
+    addRequestExpectation(202, Times.once());
+
+    int testPort = mockServer.getPort();
+
+    List<KV<Integer, DatadogEvent>> testEvents =
+        ImmutableList.of(
+            KV.of(
+                123,
+                DatadogEvent.newBuilder()
+                    .withSource("test-source-1")
+                    .withTags("test-tags-1")
+                    .withHostname("test-hostname-1")
+                    .withService("test-service-1")
+                    .withMessage("test-message-1")
+                    .build()));
+
+    PCollection<DatadogWriteError> actual =
+        pipeline
+            .apply(
+                "Create Input data",
+                Create.of(testEvents)
+                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
DatadogEventCoder.of())))
+            .apply(
+                "DatadogEventWriter",
+                ParDo.of(
+                    DatadogEventWriter.newBuilder(1)
+                        .withUrl(Joiner.on(':').join("http://localhost";, 
testPort))
+                        .withInputBatchCount(testEvents.size()) // all 
requests in a single batch.
+                        .withApiKey("test-api-key")
+                        .build()))
+            .setCoder(DatadogWriteErrorCoder.of());
+
+    PAssert.that(actual).empty();
+
+    // All successful responses, eventually.
+    pipeline.run();
+
+    // Server received exactly 4 POST requests (3 retryable failures, 1 
success).
+    mockServer.verify(HttpRequest.request(EXPECTED_PATH), 
VerificationTimes.exactly(4));
+  }
+
+  private void addRequestExpectation(int statusCode) {
+    addRequestExpectation(statusCode, Times.unlimited());
+  }
+
+  private void addRequestExpectation(int statusCode, Times times) {
+    mockServer
+        .when(HttpRequest.request(EXPECTED_PATH), times)
+        .respond(HttpResponse.response().withStatusCode(statusCode));
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogIOTest.java
 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogIOTest.java
new file mode 100644
index 00000000000..8680333b4dd
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogIOTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockserver.configuration.ConfigurationProperties;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+
+/** Unit tests for {@link com.google.cloud.teleport.datadog.DatadogIO} class. 
*/
+public class DatadogIOTest {
+
+  private static final DatadogEvent DATADOG_TEST_EVENT_1 =
+      DatadogEvent.newBuilder()
+          .withSource("test-source-1")
+          .withTags("test-tags-1")
+          .withHostname("test-hostname-1")
+          .withService("test-service-1")
+          .withMessage("test-message-1")
+          .build();
+
+  private static final DatadogEvent DATADOG_TEST_EVENT_2 =
+      DatadogEvent.newBuilder()
+          .withSource("test-source-2")
+          .withTags("test-tags-2")
+          .withHostname("test-hostname-2")
+          .withService("test-service-2")
+          .withMessage("test-message-2")
+          .build();
+
+  private static final List<DatadogEvent> DATADOG_EVENTS =
+      ImmutableList.of(DATADOG_TEST_EVENT_1, DATADOG_TEST_EVENT_2);
+
+  private static final String EXPECTED_PATH = "/" + 
DatadogEventPublisher.DD_URL_PATH;
+  private static final int TEST_PARALLELISM = 2;
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  // We create a mock server to simulate an actual Datadog API server.
+  private ClientAndServer mockServer;
+
+  @Before
+  public void setup() throws IOException {
+    ConfigurationProperties.disableSystemOut(true);
+    mockServer = startClientAndServer();
+  }
+
+  /** Test successful multi-event POST request for DatadogIO without 
parallelism. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void successfulDatadogIOMultiBatchNoParallelismTest() {
+
+    // Create server expectation for success.
+    mockServerListening(200);
+    PCollection<DatadogWriteError> actual =
+        pipeline
+            .apply("Create Input data", 
Create.of(DATADOG_EVENTS).withCoder(DatadogEventCoder.of()))
+            .apply(
+                "DatadogIO",
+                DatadogIO.writeBuilder(1)
+                    .withParallelism(1)
+                    .withBatchCount(DATADOG_EVENTS.size())
+                    .withApiKey("test-api-key")
+                    .withUrl(Joiner.on(':').join("http://localhost";, 
mockServer.getPort()))
+                    .build())
+            .setCoder(DatadogWriteErrorCoder.of());
+
+    // All successful responses.
+    PAssert.that(actual).empty();
+
+    pipeline.run();
+
+    // Server received exactly one POST request.
+    mockServer.verify(HttpRequest.request(EXPECTED_PATH), 
VerificationTimes.once());
+  }
+
+  /** Test successful multi-event POST request for DatadogIO with parallelism. 
*/
+  @Test
+  @Category(NeedsRunner.class)
+  public void successfulDatadogIOMultiBatchParallelismTest() {
+
+    // Create server expectation for success.
+    mockServerListening(200);
+    PCollection<DatadogWriteError> actual =
+        pipeline
+            .apply("Create Input data", 
Create.of(DATADOG_EVENTS).withCoder(DatadogEventCoder.of()))
+            .apply(
+                "DatadogIO",
+                DatadogIO.writeBuilder(1)
+                    .withParallelism(TEST_PARALLELISM)
+                    .withBatchCount(DATADOG_EVENTS.size())
+                    .withApiKey("test-api-key")
+                    .withUrl(Joiner.on(':').join("http://localhost";, 
mockServer.getPort()))
+                    .build())
+            .setCoder(DatadogWriteErrorCoder.of());
+
+    // All successful responses.
+    PAssert.that(actual).empty();
+
+    pipeline.run();
+
+    // Server received exactly one POST request per parallelism
+    mockServer.verify(HttpRequest.request(EXPECTED_PATH), 
VerificationTimes.atLeast(1));
+  }
+
+  /** Test successful multi-event POST request for DatadogIO with parallelism. 
*/
+  @Test
+  @Category(NeedsRunner.class)
+  public void successfulDatadogIOSingleBatchParallelismTest() {
+
+    // Create server expectation for success.
+    mockServerListening(200);
+    PCollection<DatadogWriteError> actual =
+        pipeline
+            .apply("Create Input data", 
Create.of(DATADOG_EVENTS).withCoder(DatadogEventCoder.of()))
+            .apply(
+                "DatadogIO",
+                DatadogIO.writeBuilder(1)
+                    .withParallelism(TEST_PARALLELISM)
+                    .withBatchCount(1)
+                    .withApiKey("test-api-key")
+                    .withUrl(Joiner.on(':').join("http://localhost";, 
mockServer.getPort()))
+                    .build())
+            .setCoder(DatadogWriteErrorCoder.of());
+
+    // All successful responses.
+    PAssert.that(actual).empty();
+
+    pipeline.run();
+
+    // Server received exactly 1 post request per DatadogEvent
+    mockServer.verify(
+        HttpRequest.request(EXPECTED_PATH), 
VerificationTimes.exactly(DATADOG_EVENTS.size()));
+  }
+
+  private void mockServerListening(int statusCode) {
+    mockServer
+        .when(HttpRequest.request(EXPECTED_PATH))
+        .respond(HttpResponse.response().withStatusCode(statusCode));
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoderTest.java
 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoderTest.java
new file mode 100644
index 00000000000..e5932d2b612
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.junit.Test;
+
+/** Unit tests for {@link 
com.google.cloud.teleport.datadog.DatadogWriteErrorCoder} class. */
+public class DatadogWriteErrorCoderTest {
+
+  /**
+   * Test whether {@link DatadogWriteErrorCoder} is able to encode/decode a 
{@link
+   * DatadogWriteError} correctly.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testEncodeDecode() throws IOException {
+
+    String payload = "test-payload";
+    String message = "test-message";
+    Integer statusCode = 123;
+
+    DatadogWriteError actualError =
+        DatadogWriteError.newBuilder()
+            .withPayload(payload)
+            .withStatusCode(statusCode)
+            .withStatusMessage(message)
+            .build();
+
+    DatadogWriteErrorCoder coder = DatadogWriteErrorCoder.of();
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+      coder.encode(actualError, bos);
+      try (ByteArrayInputStream bin = new 
ByteArrayInputStream(bos.toByteArray())) {
+        DatadogWriteError decodedWriteError = coder.decode(bin);
+        assertThat(decodedWriteError, is(equalTo(actualError)));
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorTest.java
 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorTest.java
new file mode 100644
index 00000000000..0aadc1f7018
--- /dev/null
+++ 
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.junit.Test;
+
+/** Unit tests for {@link DatadogWriteError} class. */
+public class DatadogWriteErrorTest {
+
+  /** Test whether a {@link DatadogWriteError} created via its builder can be 
compared correctly. */
+  @Test
+  public void testEquals() {
+
+    String payload = "test-payload";
+    String message = "test-message";
+    Integer statusCode = 123;
+
+    DatadogWriteError actualError =
+        DatadogWriteError.newBuilder()
+            .withPayload(payload)
+            .withStatusCode(statusCode)
+            .withStatusMessage(message)
+            .build();
+
+    assertThat(
+        actualError,
+        is(
+            equalTo(
+                DatadogWriteError.newBuilder()
+                    .withPayload(payload)
+                    .withStatusCode(statusCode)
+                    .withStatusMessage(message)
+                    .build())));
+
+    assertThat(
+        actualError,
+        is(
+            not(
+                equalTo(
+                    DatadogWriteError.newBuilder()
+                        .withPayload(payload)
+                        .withStatusCode(statusCode)
+                        .withStatusMessage("a-different-message")
+                        .build()))));
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index d99000383ea..4540fa4b597 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -225,6 +225,7 @@ include(":sdks:java:io:file-based-io-tests")
 include(":sdks:java:io:bigquery-io-perf-tests")
 include(":sdks:java:io:cdap")
 include(":sdks:java:io:csv")
+include(":sdks:java:io:datadog")
 include(":sdks:java:io:file-schema-transform")
 include(":sdks:java:io:google-ads")
 include(":sdks:java:io:google-cloud-platform")

Reply via email to