derrickaw commented on code in PR #38362:
URL: https://github.com/apache/beam/pull/38362#discussion_r3209107015


##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -534,6 +561,171 @@ def 
temp_pubsub_emulator(project_id="apache-beam-testing"):
     yield created_topic_object.name
 
 
+class DatadogContainer(DockerContainer):
+  """
+  DatadogContainer starts a Datadog agent container for integration tests.
+  It exposes ports for DogStatsD (metrics) and the trace agent (APM).
+  """
+  def __init__(self, image="datadog/agent:latest"):
+    super().__init__(image)
+    self.statsd_port = 8125
+    self.trace_port = 8126
+    # An API key is required, but for local testing against the agent,
+    # it doesn't have to be a valid one.
+    self.with_env("DD_API_KEY", "dummy_key_for_testing")
+    # Redirect agent's own telemetry to prevent 403 errors from the real site
+    self.with_env("DD_DD_URL", "http://localhost:1234";)
+    # Disable log collection for test purposes to reduce noise
+    self.with_env("DD_LOGS_ENABLED", "true")
+    self.with_exposed_ports(self.statsd_port, self.trace_port)
+
+  def start(self):
+    super().start()
+    # Wait for the agent to be ready to receive traces and metrics.
+    # "Agent started" indicates the core agent is up.
+    # Disabling this wait as the container is reported as healthy and
+    # this specific log might not appear with dummy URLs.
+    # wait_for_logs(self, "Agent started", timeout=120)
+    return self
+
+  def get_statsd_host(self):
+    return self.get_container_host_ip()
+
+  def get_statsd_port(self):
+    return self.get_exposed_port(self.statsd_port)
+
+  def get_trace_agent_host(self):
+    return self.get_container_host_ip()
+
+  def get_trace_agent_port(self):
+    return self.get_exposed_port(self.trace_port)
+
+  def get_api_key(self):
+    return "dummy_key_for_testing"
+
+  def get_logs_url(self):
+    # The trace agent and logs agent listen on the same port by default
+    return \
+      f"http://{self.get_container_host_ip()}:{self.get_trace_agent_port()}"
+
+
+class DatadogConnection:
+  def __init__(self, url, api_key):
+    self.url = url
+    self.api_key = api_key
+
+
+class MockDatadogHandler(http.server.BaseHTTPRequestHandler):
+  def do_POST(self):
+    if self.path == "/api/v2/logs":
+      is_chunked = self.headers.get('Transfer-Encoding',
+                                    '').lower() == 'chunked'
+      is_gzip = self.headers.get('Content-Encoding', '').lower() == 'gzip'
+      content_len = int(self.headers.get('Content-Length', 0))
+
+      try:
+        raw_data = b''
+        if is_chunked:
+          while True:
+            line = self.rfile.readline().strip()
+            if not line:
+              break
+            chunk_len = int(line, 16)
+            if chunk_len == 0:
+              self.rfile.readline()  # Clear trail
+              break
+            raw_data += self.rfile.read(chunk_len)
+            self.rfile.readline()  # Clear trail
+        elif content_len > 0:
+          raw_data = self.rfile.read(content_len)
+
+        if raw_data and is_gzip:
+          with gzip.GzipFile(fileobj=io.BytesIO(raw_data)) as f:
+            raw_data = f.read()
+
+        if raw_data:
+          data = json.loads(raw_data)
+          with self.server.record_lock:
+            if isinstance(data, list):
+              self.server.received_records.extend(data)
+            else:
+              self.server.received_records.append(data)
+      except Exception as e:
+        logging.error("CRITICAL: Failure unpacking mock datadog payload: %s", 
e)
+
+      self.send_response(200)
+      self.send_header('Content-Type', 'application/json')
+      self.end_headers()
+      self.wfile.write(b'{"status": "ok"}')
+    else:
+      self.send_response(404)
+      self.end_headers()
+
+  def log_message(self, format, *args):
+    pass
+
+
[email protected]
+def temp_datadog_mock_server(received_records):
+  server = http.server.ThreadingHTTPServer(('localhost', 0), 
MockDatadogHandler)
+  server.received_records = received_records
+  server.record_lock = threading.Lock()
+  ip, port = server.server_address
+  thread = threading.Thread(target=server.serve_forever)
+  thread.daemon = True
+  thread.start()
+  try:
+    yield f"http://{ip}:{port}";
+  finally:
+    server.shutdown()
+    server.server_close()
+    thread.join()
+
+
[email protected]
+def temp_datadog_agent(expected_records=None):
+  """Context manager to provide a temporary Datadog Agent for testing.
+
+  This function utilizes the 'testcontainers' library to spin up a
+  Datadog Agent instance within a Docker container. It yields a dictionary
+  containing connection details for the Datadog agent.
+
+  The Docker container is automatically managed and torn down when the
+  context manager exits.
+
+  Yields:
+      dict: A dictionary with connection details:
+            {'url': <url>, 'api_key': <api_key>}
+
+  Raises:
+      Exception: Any exception encountered during the setup process.
+  """
+  received = []
+  with DatadogContainer() as datadog_container,\
+    temp_datadog_mock_server(received) as mock_url:
+    try:
+      yield DatadogConnection(
+          url=mock_url,
+          api_key=datadog_container.get_api_key(),

Review Comment:
   Good call. Removed that. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to