tvalentyn commented on code in PR #38362:
URL: https://github.com/apache/beam/pull/38362#discussion_r3204932627


##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -534,6 +561,171 @@ def 
temp_pubsub_emulator(project_id="apache-beam-testing"):
     yield created_topic_object.name
 
 
+class DatadogContainer(DockerContainer):

Review Comment:
   should we move this content to a helper file, like: datadog_test_utils.py or 
something?



##########
sdks/python/apache_beam/yaml/tests/datadog.yaml:
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+  - name: DATADOG_AGENT
+    type: "apache_beam.yaml.integration_tests.temp_datadog_agent"
+    config:
+      expected_records:
+        - { ddsource: "apache-beam1", ddtags: "test-tags1", hostname: 
"test-host", service: "test-service", message: "Event for label 11a" }
+        - { ddsource: "apache-beam2", ddtags: "test-tags2", hostname: 
"test-host", service: "test-service", message: "Event for label 37a" }
+        - { ddsource: "apache-beam3", ddtags: "test-tags3", hostname: 
"test-host", service: "test-service", message: "Event for label 389a" }
+        - { ddsource: "apache-beam4", ddtags: "test-tags4", hostname: 
"test-host", service: "test-service", message: "Event for label 3821b" }
+
+pipelines:
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - { ddsource: "apache-beam1", ddtags: "test-tags1", hostname: 
"test-host", service: "test-service", message: "Event for label 11a" }
+              - { ddsource: "apache-beam2", ddtags: "test-tags2", hostname: 
"test-host", service: "test-service", message: "Event for label 37a" }
+              - { ddsource: "apache-beam3", ddtags: "test-tags3", hostname: 
"test-host", service: "test-service", message: "Event for label 389a" }
+              - { ddsource: "apache-beam4", ddtags: "test-tags4", hostname: 
"test-host", service: "test-service", message: "Event for label 3821b" }
+              - { ddsource: "apache-beam-broken", hostname: "test-host" } # 
Triggers mandatory field failure
+        - type: WriteToDatadog
+          input: Create
+          config:
+            url: "{DATADOG_AGENT.url}"
+            api_key: "{DATADOG_AGENT.api_key}"
+            min_batch_count: 1
+            batch_count: 2
+            max_buffer_size: 1000
+            parallelism: 1
+            error_handling:
+              output: error_output
+        - type: MapToFields
+          input: WriteToDatadog.error_output
+          config:
+            language: python
+            fields:
+              failed_source: "failed_row.ddsource"
+        - type: AssertEqual
+          input: MapToFields
+          config:
+            elements:
+              - { failed_source: "apache-beam-broken" }
+        # Asserting good records is taken care of by the fixture

Review Comment:
   for my education, what does this mean?



##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -20,19 +20,46 @@
 import contextlib
 import copy
 import glob
+import gzip
+import http.server
+import io
 import itertools
+import json
 import logging
 import os
 import random
 import secrets
 import sqlite3
 import string
+import struct
+import threading
 import unittest
 import uuid
 from datetime import datetime
 from datetime import timezone
 
 import mock
+
+from apache_beam.coders import Coder
+from apache_beam.coders.coder_impl import CoderImpl
+
+
+class BigEndianIntegerCoderImpl(CoderImpl):
+  def encode_to_stream(self, value, stream, nested):
+    stream.write(struct.pack('>i', value))
+
+  def decode_from_stream(self, stream, nested):
+    return struct.unpack('>i', stream.read(4))[0]
+
+
+class BigEndianIntegerCoder(Coder):
+  def get_impl(self):
+    return BigEndianIntegerCoderImpl()
+
+
+Coder.register_urn(

Review Comment:
   for my education, why did we have to do this?



##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -534,6 +561,171 @@ def 
temp_pubsub_emulator(project_id="apache-beam-testing"):
     yield created_topic_object.name
 
 
+class DatadogContainer(DockerContainer):
+  """
+  DatadogContainer starts a Datadog agent container for integration tests.
+  It exposes ports for DogStatsD (metrics) and the trace agent (APM).
+  """
+  def __init__(self, image="datadog/agent:latest"):
+    super().__init__(image)
+    self.statsd_port = 8125
+    self.trace_port = 8126
+    # An API key is required, but for local testing against the agent,
+    # it doesn't have to be a valid one.
+    self.with_env("DD_API_KEY", "dummy_key_for_testing")
+    # Redirect agent's own telemetry to prevent 403 errors from the real site
+    self.with_env("DD_DD_URL", "http://localhost:1234";)
+    # Disable log collection for test purposes to reduce noise
+    self.with_env("DD_LOGS_ENABLED", "true")
+    self.with_exposed_ports(self.statsd_port, self.trace_port)
+
+  def start(self):
+    super().start()
+    # Wait for the agent to be ready to receive traces and metrics.
+    # "Agent started" indicates the core agent is up.
+    # Disabling this wait as the container is reported as healthy and
+    # this specific log might not appear with dummy URLs.
+    # wait_for_logs(self, "Agent started", timeout=120)
+    return self
+
+  def get_statsd_host(self):
+    return self.get_container_host_ip()
+
+  def get_statsd_port(self):
+    return self.get_exposed_port(self.statsd_port)
+
+  def get_trace_agent_host(self):
+    return self.get_container_host_ip()
+
+  def get_trace_agent_port(self):
+    return self.get_exposed_port(self.trace_port)
+
+  def get_api_key(self):
+    return "dummy_key_for_testing"
+
+  def get_logs_url(self):
+    # The trace agent and logs agent listen on the same port by default
+    return \
+      f"http://{self.get_container_host_ip()}:{self.get_trace_agent_port()}"
+
+
+class DatadogConnection:
+  def __init__(self, url, api_key):
+    self.url = url
+    self.api_key = api_key
+
+
+class MockDatadogHandler(http.server.BaseHTTPRequestHandler):
+  def do_POST(self):
+    if self.path == "/api/v2/logs":
+      is_chunked = self.headers.get('Transfer-Encoding',
+                                    '').lower() == 'chunked'
+      is_gzip = self.headers.get('Content-Encoding', '').lower() == 'gzip'
+      content_len = int(self.headers.get('Content-Length', 0))
+
+      try:
+        raw_data = b''
+        if is_chunked:
+          while True:
+            line = self.rfile.readline().strip()
+            if not line:
+              break
+            chunk_len = int(line, 16)
+            if chunk_len == 0:
+              self.rfile.readline()  # Clear trail
+              break
+            raw_data += self.rfile.read(chunk_len)
+            self.rfile.readline()  # Clear trail
+        elif content_len > 0:
+          raw_data = self.rfile.read(content_len)
+
+        if raw_data and is_gzip:
+          with gzip.GzipFile(fileobj=io.BytesIO(raw_data)) as f:
+            raw_data = f.read()
+
+        if raw_data:
+          data = json.loads(raw_data)
+          with self.server.record_lock:
+            if isinstance(data, list):
+              self.server.received_records.extend(data)
+            else:
+              self.server.received_records.append(data)
+      except Exception as e:
+        logging.error("CRITICAL: Failure unpacking mock datadog payload: %s", 
e)
+
+      self.send_response(200)
+      self.send_header('Content-Type', 'application/json')
+      self.end_headers()
+      self.wfile.write(b'{"status": "ok"}')
+    else:
+      self.send_response(404)
+      self.end_headers()
+
+  def log_message(self, format, *args):
+    pass
+
+
[email protected]
+def temp_datadog_mock_server(received_records):
+  server = http.server.ThreadingHTTPServer(('localhost', 0), 
MockDatadogHandler)
+  server.received_records = received_records
+  server.record_lock = threading.Lock()
+  ip, port = server.server_address
+  thread = threading.Thread(target=server.serve_forever)
+  thread.daemon = True
+  thread.start()
+  try:
+    yield f"http://{ip}:{port}";
+  finally:
+    server.shutdown()
+    server.server_close()
+    thread.join()
+
+
[email protected]
+def temp_datadog_agent(expected_records=None):
+  """Context manager to provide a temporary Datadog Agent for testing.
+
+  This function utilizes the 'testcontainers' library to spin up a
+  Datadog Agent instance within a Docker container. It yields a dictionary
+  containing connection details for the Datadog agent.
+
+  The Docker container is automatically managed and torn down when the
+  context manager exits.
+
+  Yields:
+      dict: A dictionary with connection details:
+            {'url': <url>, 'api_key': <api_key>}
+
+  Raises:
+      Exception: Any exception encountered during the setup process.
+  """
+  received = []
+  with DatadogContainer() as datadog_container,\
+    temp_datadog_mock_server(received) as mock_url:
+    try:
+      yield DatadogConnection(
+          url=mock_url,
+          api_key=datadog_container.get_api_key(),

Review Comment:
   do we use the datadog_container (with the real Datadog Agen) only to get the 
api key?



##########
sdks/python/apache_beam/yaml/standard_io.yaml:
##########
@@ -428,3 +428,22 @@
       config:
         gradle_target: 'sdks:java:io:expansion-service:shadowJar'
 
+- type: renaming
+  transforms:
+    'WriteToDatadog': 'WriteToDatadog'
+  config:
+    mappings:
+      'WriteToDatadog':
+        'url': 'url'
+        'api_key': 'api_key'
+        'min_batch_count': 'min_batch_count'
+        'batch_count': 'batch_count'
+        'max_buffer_size': 'max_buffer_size'
+        'parallelism': 'parallelism'
+        'error_handling': 'error_handling'
+    underlying_provider:
+      type: beamJar
+      transforms:
+        'WriteToDatadog': 
'beam:schematransform:org.apache.beam:datadog_write:v1'

Review Comment:
   1) Does this use Beam Java's sdk/io/datadog ?
   2) If you might know, how do we send data points to Datadog? I see that your 
are involving Datadog agent in the IT + a fake Datadog server. Do we go through 
an agent in Beam Java SDK or we make HTTP calls to the server directly? 
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to