This is an automated email from the ASF dual-hosted git repository.

sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new db9bad9  [hotfix][runtime] EventLogger displays the content of 
PythonEvent as string (#367)
db9bad9 is described below

commit db9bad9e35d40b938f8def2f43ceb03c27a47df2
Author: Xuannan <[email protected]>
AuthorDate: Tue Dec 2 16:01:37 2025 +0800

    [hotfix][runtime] EventLogger displays the content of PythonEvent as string 
(#367)
    
    * [hotfix][runtime] EventLogger displays the content of PythonEvent as 
string
---
 .../e2e_tests/python_event_logging_test.py         | 123 ++++++++++++++++++++
 .../flink_agents/runtime/flink_runner_context.py   |   4 +-
 python/flink_agents/runtime/python_java_utils.py   |  11 +-
 .../python/context/PythonRunnerContextImpl.java    |   4 +-
 .../agents/runtime/python/event/PythonEvent.java   |  36 +++++-
 .../runtime/python/utils/PythonActionExecutor.java |  10 +-
 .../runtime/python/event/PythonEventTest.java      | 125 +++++++++++++++++++++
 7 files changed, 299 insertions(+), 14 deletions(-)

diff --git a/python/flink_agents/e2e_tests/python_event_logging_test.py 
b/python/flink_agents/e2e_tests/python_event_logging_test.py
new file mode 100644
index 0000000..3486fa5
--- /dev/null
+++ b/python/flink_agents/e2e_tests/python_event_logging_test.py
@@ -0,0 +1,123 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+import shutil
+import tempfile
+from pathlib import Path
+
+from pyflink.common import Configuration, WatermarkStrategy
+from pyflink.datastream import (
+    KeySelector,
+    RuntimeExecutionMode,
+    StreamExecutionEnvironment,
+)
+from pyflink.datastream.connectors.file_system import (
+    FileSource,
+    StreamFormat,
+)
+
+from flink_agents.api.agent import Agent
+from flink_agents.api.decorators import action
+from flink_agents.api.events.event import InputEvent, OutputEvent
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.api.runner_context import RunnerContext
+
+
+class InputKeySelector(KeySelector):
+    """Key selector for input data."""
+
+    def get_key(self, value: dict) -> int:
+        """Extract key from input data."""
+        return value.get("id", 0)
+
+
+class PythonEventLoggingAgent(Agent):
+    """Agent for testing PythonEvent logging."""
+
+    @action(InputEvent)
+    @staticmethod
+    def process_input(event: InputEvent, ctx: RunnerContext) -> None:
+        """Process input event and send a PythonEvent."""
+        # Send a PythonEvent that should be logged with readable content
+        input_data = event.input
+        ctx.send_event(
+            OutputEvent(output={"processed_review": f"{input_data['review']}"})
+        )
+
+
+def test_python_event_logging() -> None:
+    """Test that PythonEvent can be logged with readable content."""
+    # Check that log files were created in the default location
+    default_log_dir = Path(tempfile.gettempdir()) / "flink-agents"
+    shutil.rmtree(default_log_dir, ignore_errors=True)
+
+    config = Configuration()
+    env = StreamExecutionEnvironment.get_execution_environment(config)
+    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+    env.set_parallelism(1)
+
+    # Create agent environment
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+
+    # Set up input source
+    current_dir = Path(__file__).parent
+    input_datastream = env.from_source(
+        source=FileSource.for_record_stream_format(
+            StreamFormat.text_line_format(),
+            f"file:///{current_dir}/resources/input/input_data.txt",
+        ).build(),
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="python_event_logging_test",
+    )
+
+    # Parse input
+    deserialize_datastream = input_datastream.map(lambda x: json.loads(x))
+
+    # Apply agent
+    agents_env.from_datastream(
+        input=deserialize_datastream, key_selector=InputKeySelector()
+    ).apply(PythonEventLoggingAgent()).to_datastream()
+
+    # Execute the job
+    agents_env.execute()
+
+    # Also check our custom log directory
+    log_files = []
+    if default_log_dir.exists():
+        log_files.extend(default_log_dir.glob("events-*.log"))
+
+    # At least one log file should exist
+    assert len(log_files) > 0, (
+        f"Event log files should be created in {default_log_dir}"
+    )
+
+    # Check that log files contain readable PythonEvent content
+    log_content = ""
+    for log_file in log_files:
+        with log_file.open() as f:
+            log_content += f.read()
+
+    print(log_content)
+
+    # Verify log contains expected content - should have readable event data 
via
+    # eventString
+    assert "processed_review" in log_content, (
+        "Log should contain processed event content from eventString"
+    )
+    assert "eventString" in log_content, "Log should contain eventString field"
+    assert "eventType" in log_content, "Log should contain event type 
information"
diff --git a/python/flink_agents/runtime/flink_runner_context.py 
b/python/flink_agents/runtime/flink_runner_context.py
index c226d18..56f2ebb 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -64,8 +64,10 @@ class FlinkRunnerContext(RunnerContext):
             The event to be processed by the agent system.
         """
         class_path = 
f"{event.__class__.__module__}.{event.__class__.__qualname__}"
+        event_bytes = cloudpickle.dumps(event)
+        event_string = str(event)
         try:
-            self._j_runner_context.sendEvent(class_path, 
cloudpickle.dumps(event))
+            self._j_runner_context.sendEvent(class_path, event_bytes, 
event_string)
         except Exception as e:
             err_msg = "Failed to send event " + class_path + " to runner 
context"
             raise RuntimeError(err_msg) from e
diff --git a/python/flink_agents/runtime/python_java_utils.py 
b/python/flink_agents/runtime/python_java_utils.py
index 4190c55..586e6cb 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/python_java_utils.py
@@ -27,9 +27,14 @@ def convert_to_python_object(bytesObject: bytes) -> Any:
     return cloudpickle.loads(bytesObject)
 
 
-def wrap_to_input_event(bytesObject: bytes) -> bytes:
-    """Wrap data to python input event and serialize."""
-    return cloudpickle.dumps(InputEvent(input=cloudpickle.loads(bytesObject)))
+def wrap_to_input_event(bytesObject: bytes) -> tuple[bytes, str]:
+    """Wrap data to python input event and serialize.
+
+    Returns:
+        A tuple of (serialized_event_bytes, event_string_representation)
+    """
+    event = InputEvent(input=cloudpickle.loads(bytesObject))
+    return (cloudpickle.dumps(event), str(event))
 
 
 def get_output_from_output_event(bytesObject: bytes) -> Any:
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
index 4bdb8d8..26d5a79 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
@@ -58,9 +58,9 @@ public class PythonRunnerContextImpl extends 
RunnerContextImpl {
         super.sendEvent(event);
     }
 
-    public void sendEvent(String type, byte[] event) {
+    public void sendEvent(String type, byte[] event, String eventString) {
         // this method will be invoked by PythonActionExecutor's python 
interpreter.
-        sendEvent(new PythonEvent(event, type));
+        sendEvent(new PythonEvent(event, type, eventString));
     }
 
     public PythonActionExecutor getPythonActionExecutor() {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
index 1a99e97..225e3e9 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
@@ -21,6 +21,7 @@ package org.apache.flink.agents.runtime.python.event;
 
 import org.apache.flink.agents.api.Event;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.Arrays;
@@ -28,15 +29,23 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 
-/** An event generated by the framework, passing a Python event to the Java 
agent runner. */
+/**
+ * An event generated by the framework, passing a Python event to the Java 
agent runner.
+ *
+ * <p>This class stores Python objects as serialized byte arrays for 
processing, and also maintains
+ * a human-readable string representation (eventString) for logging purposes. 
The eventString is
+ * generated on the Python side when the event is created.
+ */
 public class PythonEvent extends Event {
     private final byte[] event;
     private final String eventType;
+    private final String eventString;
 
-    public PythonEvent(byte[] event, String eventType) {
+    public PythonEvent(byte[] event, String eventType, String eventString) {
         super();
         this.event = event;
         this.eventType = eventType;
+        this.eventString = eventString;
     }
 
     @JsonCreator
@@ -44,12 +53,15 @@ public class PythonEvent extends Event {
             @JsonProperty("id") UUID id,
             @JsonProperty("attributes") Map<String, Object> attributes,
             @JsonProperty("event") byte[] event,
-            @JsonProperty("eventType") String eventType) {
+            @JsonProperty("eventType") String eventType,
+            @JsonProperty("eventString") String eventString) {
         super(id, attributes);
         this.event = event;
         this.eventType = eventType;
+        this.eventString = eventString;
     }
 
+    @JsonIgnore // Don't serialize byte array in logs - used for processing 
only
     public byte[] getEvent() {
         return event;
     }
@@ -58,6 +70,18 @@ public class PythonEvent extends Event {
         return eventType;
     }
 
+    /**
+     * Returns the human-readable string representation of this event.
+     *
+     * <p>This string is generated on the Python side when the event is 
created and is primarily
+     * used for logging purposes.
+     *
+     * @return the string representation of the event, or null if not available
+     */
+    public String getEventString() {
+        return eventString;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -66,11 +90,13 @@ public class PythonEvent extends Event {
             return false;
         }
         PythonEvent other = (PythonEvent) o;
-        return Arrays.equals(event, other.event) && 
Objects.equals(this.eventType, other.eventType);
+        return Arrays.equals(event, other.event)
+                && Objects.equals(this.eventType, other.eventType)
+                && Objects.equals(this.eventString, other.eventString);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), Arrays.hashCode(event), 
eventType);
+        return Objects.hash(super.hashCode(), Arrays.hashCode(event), 
eventType, eventString);
     }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 014d450..ded0140 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -124,9 +124,13 @@ public class PythonActionExecutor {
 
     public PythonEvent wrapToInputEvent(Object eventData) {
         checkState(eventData instanceof byte[]);
-        return new PythonEvent(
-                (byte[]) interpreter.invoke(WRAP_TO_INPUT_EVENT, eventData),
-                EventUtil.PYTHON_INPUT_EVENT_NAME);
+        // wrap_to_input_event returns a tuple of (bytes, str)
+        Object result = interpreter.invoke(WRAP_TO_INPUT_EVENT, eventData);
+        checkState(result.getClass().isArray() && ((Object[]) result).length 
== 2);
+        Object[] resultArray = (Object[]) result;
+        byte[] eventBytes = (byte[]) resultArray[0];
+        String eventString = (String) resultArray[1];
+        return new PythonEvent(eventBytes, EventUtil.PYTHON_INPUT_EVENT_NAME, 
eventString);
     }
 
     public Object getOutputFromOutputEvent(byte[] pythonOutputEvent) {
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
new file mode 100644
index 0000000..600583e
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.agents.runtime.python.event;
+
+import org.apache.flink.agents.api.EventContext;
+import org.apache.flink.agents.runtime.eventlog.EventLogRecord;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link PythonEvent}. */
+class PythonEventTest {
+
+    private ObjectMapper objectMapper;
+
+    @BeforeEach
+    void setUp() {
+        objectMapper = new ObjectMapper();
+    }
+
+    @Test
+    void testCreatePythonEventWithEventString() {
+        // Given
+        byte[] eventBytes = new byte[] {1, 2, 3, 4, 5};
+        String eventType = "flink_agents.api.events.event.InputEvent";
+        String eventString = "InputEvent(input='test data')";
+
+        // When
+        PythonEvent event = new PythonEvent(eventBytes, eventType, 
eventString);
+
+        // Then
+        assertThat(event.getEvent()).isEqualTo(eventBytes);
+        assertThat(event.getEventType()).isEqualTo(eventType);
+        assertThat(event.getEventString()).isEqualTo(eventString);
+    }
+
+    @Test
+    void testJsonSerializationWithEventString() throws Exception {
+        // Given
+        UUID expectedId = UUID.randomUUID();
+        Map<String, Object> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("testKey", "testValue");
+        byte[] eventBytes = "test_bytes".getBytes();
+        String eventType = "flink_agents.api.events.event.OutputEvent";
+        String eventString = "OutputEvent(output={'key': 'value'})";
+
+        PythonEvent event =
+                new PythonEvent(expectedId, expectedAttributes, eventBytes, 
eventType, eventString);
+
+        // When
+        String json = objectMapper.writeValueAsString(event);
+
+        // Then
+        JsonNode jsonNode = objectMapper.readTree(json);
+        assertThat(jsonNode.has("id")).isTrue();
+        assertThat(jsonNode.has("eventType")).isTrue();
+        assertThat(jsonNode.has("eventString")).isTrue();
+        assertThat(jsonNode.has("attributes")).isTrue();
+        // event bytes should not be serialized
+        assertThat(jsonNode.has("event")).isFalse();
+        assertThat(jsonNode.get("eventType").asText()).isEqualTo(eventType);
+        
assertThat(jsonNode.get("eventString").asText()).isEqualTo(eventString);
+        
assertThat(jsonNode.get("attributes").get("testKey").asText()).isEqualTo("testValue");
+    }
+
+    @Test
+    void testEventLogRecordSerializationWithEventString() throws Exception {
+        // Given - simulate how PythonEvent is used in EventLogger
+        UUID eventId = UUID.randomUUID();
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put("source", "python");
+        byte[] eventBytes = "serialized_event".getBytes();
+        String eventType = "flink_agents.api.events.event.InputEvent";
+        String eventString = "InputEvent(input={'key': 'value', 'count': 42})";
+
+        PythonEvent pythonEvent =
+                new PythonEvent(eventId, attributes, eventBytes, eventType, 
eventString);
+        pythonEvent.setSourceTimestamp(1234567890L);
+
+        EventContext context = new EventContext(pythonEvent);
+        EventLogRecord record = new EventLogRecord(context, pythonEvent);
+
+        // When
+        String json = objectMapper.writeValueAsString(record);
+
+        // Then
+        JsonNode jsonNode = objectMapper.readTree(json);
+
+        // Verify context contains PythonEvent type
+        assertThat(jsonNode.get("context").get("eventType").asText())
+                
.isEqualTo("org.apache.flink.agents.runtime.python.event.PythonEvent");
+
+        // Verify event contains human-readable eventString
+        JsonNode eventNode = jsonNode.get("event");
+        
assertThat(eventNode.get("eventString").asText()).isEqualTo(eventString);
+        assertThat(eventNode.get("eventType").asText()).isEqualTo(eventType);
+        assertThat(eventNode.get("id").asText()).isEqualTo(eventId.toString());
+        // Byte array should not be in the log
+        assertThat(eventNode.has("event")).isFalse();
+    }
+}

Reply via email to