This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit 893e5232ea4dc1586bb8443e53285b150df0be62
Author: sxnan <[email protected]>
AuthorDate: Fri Jan 16 18:11:12 2026 +0800

    [log] Improve the event log content for PythonEvent
---
 .../e2e_tests_integration/event_log_test.py        | 100 ------------------
 .../python_event_logging_test.py                   |  63 +++++++----
 .../flink_agents/runtime/flink_runner_context.py   |   5 +-
 .../runtime/java/java_resource_wrapper.py          |   2 +-
 python/flink_agents/runtime/python_java_utils.py   |  20 +++-
 .../eventlog/EventLogRecordJsonDeserializer.java   |  37 +++++--
 .../eventlog/EventLogRecordJsonSerializer.java     | 116 ++++++++++++++++++---
 .../python/context/PythonRunnerContextImpl.java    |   4 +-
 .../agents/runtime/python/event/PythonEvent.java   |  26 ++---
 .../runtime/python/utils/PythonActionExecutor.java |   4 +-
 .../eventlog/EventLogRecordJsonSerdeTest.java      |  16 ++-
 .../runtime/eventlog/FileEventLoggerTest.java      |   2 +-
 .../runtime/python/event/PythonEventTest.java      |  34 +++---
 13 files changed, 231 insertions(+), 198 deletions(-)

diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py 
b/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py
deleted file mode 100644
index 275f5a73..00000000
--- a/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py
+++ /dev/null
@@ -1,100 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-#################################################################################
-import json
-import os
-import sysconfig
-from pathlib import Path
-
-from pyflink.common import Configuration, Encoder, WatermarkStrategy
-from pyflink.common.typeinfo import Types
-from pyflink.datastream import RuntimeExecutionMode, StreamExecutionEnvironment
-from pyflink.datastream.connectors.file_system import (
-    FileSource,
-    StreamFormat,
-    StreamingFileSink,
-)
-
-from flink_agents.api.execution_environment import AgentsExecutionEnvironment
-from flink_agents.e2e_tests.e2e_tests_integration.flink_integration_agent 
import (
-    DataStreamAgent,
-    ItemData,
-    MyKeySelector,
-)
-
-current_dir = Path(__file__).parent
-os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
-
-
-def test_event_log_base_dir_flink(tmp_path: Path) -> None:  # noqa: D103
-    config = Configuration()
-    config.set_string("state.backend.type", "rocksdb")
-    config.set_string("checkpointing.interval", "1s")
-    config.set_string("restart-strategy.type", "disable")
-    env = StreamExecutionEnvironment.get_execution_environment(config)
-    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
-    env.set_parallelism(1)
-
-    input_datastream = env.from_source(
-        source=FileSource.for_record_stream_format(
-            StreamFormat.text_line_format(), 
f"file:///{current_dir}/../resources/input"
-        ).build(),
-        watermark_strategy=WatermarkStrategy.no_watermarks(),
-        source_name="event_log_test_source",
-    )
-
-    deserialize_datastream = input_datastream.map(
-        lambda x: ItemData.model_validate_json(x)
-    )
-
-    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
-    event_log_dir = tmp_path / "event_log"
-    agents_env.get_config().set_str("baseLogDir", str(event_log_dir))
-
-    output_datastream = (
-        agents_env.from_datastream(
-            input=deserialize_datastream, key_selector=MyKeySelector()
-        )
-        .apply(DataStreamAgent())
-        .to_datastream()
-    )
-
-    result_dir = tmp_path / "results"
-    result_dir.mkdir(parents=True, exist_ok=True)
-    output_datastream.map(lambda x: x.model_dump_json(), 
Types.STRING()).add_sink(
-        StreamingFileSink.for_row_format(
-            base_path=str(result_dir.absolute()),
-            encoder=Encoder.simple_string_encoder(),
-        ).build()
-    )
-
-    agents_env.execute()
-
-    event_logs = list(event_log_dir.glob("events-*.log"))
-    assert event_logs, "No event log files found in configured baseLogDir."
-
-    first_log = event_logs[0]
-    record = None
-    with first_log.open("r", encoding="utf-8") as handle:
-        for line in handle:
-            if line.strip():
-                record = json.loads(line)
-                break
-
-    assert record is not None, "Event log file is empty."
-    assert "context" in record
-    assert "event" in record
diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
 
b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
index ee85aeb6..2db30804 100644
--- 
a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
+++ 
b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
@@ -16,7 +16,9 @@
 # limitations under the License.
 
#################################################################################
 import json
+import os
 import shutil
+import sysconfig
 import tempfile
 from pathlib import Path
 
@@ -37,6 +39,8 @@ from flink_agents.api.events.event import InputEvent, 
OutputEvent
 from flink_agents.api.execution_environment import AgentsExecutionEnvironment
 from flink_agents.api.runner_context import RunnerContext
 
+os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
+
 
 class InputKeySelector(KeySelector):
     """Key selector for input data."""
@@ -60,9 +64,9 @@ class PythonEventLoggingAgent(Agent):
         )
 
 
-def test_python_event_logging() -> None:
-    """Test that PythonEvent can be logged with readable content."""
-    # Check that log files were created in the default location
+def test_python_event_logging(tmp_path: Path) -> None:
+    """Test event logs are written to configured directory with expected 
content."""
+    event_log_dir = tmp_path / "event_log"
     default_log_dir = Path(tempfile.gettempdir()) / "flink-agents"
     shutil.rmtree(default_log_dir, ignore_errors=True)
 
@@ -73,6 +77,7 @@ def test_python_event_logging() -> None:
 
     # Create agent environment
     agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+    agents_env.get_config().set_str("baseLogDir", str(event_log_dir))
 
     # Set up input source
     current_dir = Path(__file__).parent
@@ -96,28 +101,42 @@ def test_python_event_logging() -> None:
     # Execute the job
     agents_env.execute()
 
-    # Also check our custom log directory
-    log_files = []
-    if default_log_dir.exists():
-        log_files.extend(default_log_dir.glob("events-*.log"))
+    # Check that log files were created in configured directory
+    log_files = list(event_log_dir.glob("events-*.log"))
 
     # At least one log file should exist
     assert len(log_files) > 0, (
-        f"Event log files should be created in {default_log_dir}"
+        f"Event log files should be created in {event_log_dir}"
     )
 
-    # Check that log files contain readable PythonEvent content
-    log_content = ""
+    # Check that log files contain structured event content
+    record = None
+    record_line = None
+    has_processed_review = False
     for log_file in log_files:
-        with log_file.open() as f:
-            log_content += f.read()
-
-    print(log_content)
-
-    # Verify log contains expected content - should have readable event data 
via
-    # eventString
-    assert "processed_review" in log_content, (
-        "Log should contain processed event content from eventString"
-    )
-    assert "eventString" in log_content, "Log should contain eventString field"
-    assert "eventType" in log_content, "Log should contain event type 
information"
+        with log_file.open(encoding="utf-8") as handle:
+            for line in handle:
+                if line.strip():
+                    record = json.loads(line)
+                    record_line = line
+                    event_payload = record.get("event", {})
+                    if "processed_review" in json.dumps(event_payload):
+                        has_processed_review = True
+                        break
+        if record is not None and has_processed_review:
+            break
+
+    assert record is not None, "Event log file is empty."
+    assert record_line is not None, "Event log file is empty."
+    assert "timestamp" in record
+    assert "event" in record
+    assert "eventType" in record["event"]
+    assert has_processed_review, "Log should contain processed review content"
+
+    event_type_idx = record_line.find('"eventType"')
+    id_idx = record_line.find('"id"')
+    attributes_idx = record_line.find('"attributes"')
+    assert event_type_idx != -1
+    assert id_idx != -1
+    assert attributes_idx != -1
+    assert event_type_idx < id_idx < attributes_idx
diff --git a/python/flink_agents/runtime/flink_runner_context.py 
b/python/flink_agents/runtime/flink_runner_context.py
index 994f095e..0e8664a0 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -44,6 +44,7 @@ from 
flink_agents.runtime.memory.internal_base_long_term_memory import (
 from flink_agents.runtime.memory.vector_store_long_term_memory import (
     VectorStoreLongTermMemory,
 )
+from flink_agents.runtime.python_java_utils import _build_event_log_string
 
 logger = logging.getLogger(__name__)
 
@@ -220,9 +221,9 @@ class FlinkRunnerContext(RunnerContext):
         """
         class_path = 
f"{event.__class__.__module__}.{event.__class__.__qualname__}"
         event_bytes = cloudpickle.dumps(event)
-        event_string = str(event)
+        event_json_str = _build_event_log_string(event, class_path)
         try:
-            self._j_runner_context.sendEvent(class_path, event_bytes, 
event_string)
+            self._j_runner_context.sendEvent(class_path, event_bytes, 
event_json_str)
         except Exception as e:
             err_msg = "Failed to send event " + class_path + " to runner 
context"
             raise RuntimeError(err_msg) from e
diff --git a/python/flink_agents/runtime/java/java_resource_wrapper.py 
b/python/flink_agents/runtime/java/java_resource_wrapper.py
index 56a101ca..0d1e8a46 100644
--- a/python/flink_agents/runtime/java/java_resource_wrapper.py
+++ b/python/flink_agents/runtime/java/java_resource_wrapper.py
@@ -17,7 +17,6 @@
 
#################################################################################
 from typing import Any, List
 
-from pemja import findClass
 from pydantic import Field
 from typing_extensions import override
 
@@ -54,6 +53,7 @@ class JavaPrompt(Prompt):
     def format_messages(
         self, role: MessageRole = MessageRole.SYSTEM, **kwargs: str
     ) -> List[ChatMessage]:
+        from pemja import findClass
         j_MessageRole = 
findClass("org.apache.flink.agents.api.chat.messages.MessageRole")
         j_chat_messages = 
self.j_prompt.formatMessages(j_MessageRole.fromValue(role.value), kwargs)
         chatMessages = 
[ChatMessage(role=MessageRole(j_chat_message.getRole().getValue()),
diff --git a/python/flink_agents/runtime/python_java_utils.py 
b/python/flink_agents/runtime/python_java_utils.py
index 509d0c51..dc320e7c 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/python_java_utils.py
@@ -16,13 +16,13 @@
 # limitations under the License.
 
#################################################################################
 import importlib
+import json
 from typing import Any, Callable, Dict
 
 import cloudpickle
-from pemja import findClass
 
 from flink_agents.api.chat_message import ChatMessage, MessageRole
-from flink_agents.api.events.event import InputEvent
+from flink_agents.api.events.event import Event, InputEvent
 from flink_agents.api.resource import Resource, ResourceType, 
get_resource_class
 from flink_agents.api.tools.tool import ToolMetadata
 from flink_agents.api.tools.utils import create_model_from_java_tool_schema_str
@@ -46,14 +46,25 @@ def convert_to_python_object(bytesObject: bytes) -> Any:
     return cloudpickle.loads(bytesObject)
 
 
+def _build_event_log_string(event: InputEvent | Event, event_type: str) -> str:
+    try:
+        payload = json.loads(event.model_dump_json())
+        payload["eventType"] = event_type
+        payload.setdefault("attributes", {})
+        return json.dumps(payload)
+    except Exception:
+        return str(event)
+
+
 def wrap_to_input_event(bytesObject: bytes) -> tuple[bytes, str]:
     """Wrap data to python input event and serialize.
 
     Returns:
-        A tuple of (serialized_event_bytes, event_string_representation)
+        A tuple of (serialized_event_bytes, event_json_str)
     """
     event = InputEvent(input=cloudpickle.loads(bytesObject))
-    return (cloudpickle.dumps(event), str(event))
+    event_type = f"{event.__class__.__module__}.{event.__class__.__qualname__}"
+    return (cloudpickle.dumps(event), _build_event_log_string(event, 
event_type))
 
 
 def get_output_from_output_event(bytesObject: bytes) -> Any:
@@ -166,6 +177,7 @@ def from_java_chat_message(j_chat_message: Any) -> 
ChatMessage:
 
 def to_java_chat_message(chat_message: ChatMessage) -> Any:
     """Convert a chat message to a java chat message."""
+    from pemja import findClass
     j_ChatMessage = 
findClass("org.apache.flink.agents.api.chat.messages.ChatMessage")
     j_chat_message = j_ChatMessage()
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
index f2b89d3a..a516edf5 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.api.EventContext;
 
@@ -51,24 +52,21 @@ public class EventLogRecordJsonDeserializer extends 
JsonDeserializer<EventLogRec
         ObjectMapper mapper = (ObjectMapper) parser.getCodec();
         JsonNode rootNode = mapper.readTree(parser);
 
-        // Deserialize context
-        JsonNode contextNode = rootNode.get("context");
-        if (contextNode == null) {
-            throw new IOException("Missing 'context' field in EventLogRecord 
JSON");
+        // Deserialize timestamp
+        JsonNode timestampNode = rootNode.get("timestamp");
+        if (timestampNode == null || !timestampNode.isTextual()) {
+            throw new IOException("Missing 'timestamp' field in EventLogRecord 
JSON");
         }
 
-        EventContext eventContext = mapper.treeToValue(contextNode, 
EventContext.class);
-        if (eventContext == null) {
-            throw new IOException("Failed to deserialize EventContext");
-        }
-
-        // Deserialize event using eventType from context
+        // Deserialize event using eventType from event node
         JsonNode eventNode = rootNode.get("event");
         if (eventNode == null) {
             throw new IOException("Missing 'event' field in EventLogRecord 
JSON");
         }
+        String eventType = getEventType(eventNode);
 
-        Event event = deserializeEvent(mapper, eventNode, 
eventContext.getEventType());
+        Event event = deserializeEvent(mapper, stripEventType(eventNode), 
eventType);
+        EventContext eventContext = new EventContext(eventType, 
timestampNode.asText());
 
         return new EventLogRecord(eventContext, event);
     }
@@ -102,4 +100,21 @@ public class EventLogRecordJsonDeserializer extends 
JsonDeserializer<EventLogRec
                     String.format("Failed to deserialize event of type '%s'", 
eventType), e);
         }
     }
+
+    private static String getEventType(JsonNode eventNode) throws IOException {
+        JsonNode eventTypeNode = eventNode.get("eventType");
+        if (eventTypeNode == null || !eventTypeNode.isTextual()) {
+            throw new IOException("Missing 'eventType' field in event JSON");
+        }
+        return eventTypeNode.asText();
+    }
+
+    private static JsonNode stripEventType(JsonNode eventNode) {
+        if (eventNode.isObject()) {
+            ObjectNode copy = ((ObjectNode) eventNode).deepCopy();
+            copy.remove("eventType");
+            return copy;
+        }
+        return eventNode;
+    }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
index 89a4e0f7..db7bebd7 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
@@ -19,10 +19,17 @@
 package org.apache.flink.agents.runtime.eventlog;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.runtime.python.event.PythonEvent;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Custom JSON serializer for {@link EventLogRecord}.
@@ -31,7 +38,7 @@ import java.io.IOException;
  * for structured logging. The serialization includes:
  *
  * <ul>
- *   <li>EventContext with eventType and timestamp information
+ *   <li>Top-level timestamp
  *   <li>Event data serialized as a standard JSON object
  * </ul>
  *
@@ -39,11 +46,9 @@ import java.io.IOException;
  *
  * <pre>{@code
  * {
- *   "context": {
- *     "eventType": "org.apache.flink.agents.api.InputEvent",
- *     "timestamp": "2024-01-15T10:30:00Z"
- *   },
+ *   "timestamp": "2024-01-15T10:30:00Z",
  *   "event": {
+ *     "eventType": "org.apache.flink.agents.api.InputEvent"
  *     // Event-specific fields serialized normally
  *   }
  * }
@@ -55,16 +60,103 @@ public class EventLogRecordJsonSerializer extends 
JsonSerializer<EventLogRecord>
     public void serialize(EventLogRecord record, JsonGenerator gen, 
SerializerProvider serializers)
             throws IOException {
 
-        gen.writeStartObject();
+        ObjectMapper mapper = (ObjectMapper) gen.getCodec();
+        if (mapper == null) {
+            mapper = new ObjectMapper();
+        }
 
-        // Serialize context - contains eventType and timestamp
-        gen.writeFieldName("context");
-        serializers.defaultSerializeValue(record.getContext(), gen);
+        gen.writeStartObject();
+        gen.writeStringField("timestamp", record.getContext().getTimestamp());
 
-        // Serialize event - standard JSON serialization
         gen.writeFieldName("event");
-        serializers.defaultSerializeValue(record.getEvent(), gen);
-
+        JsonNode eventNode = buildEventNode(record.getEvent(), mapper);
+        if (!eventNode.isObject()) {
+            throw new IllegalStateException(
+                    "Event log payload must be a JSON object, but was: " + 
eventNode.getNodeType());
+        }
+        eventNode = reorderEventFields((ObjectNode) eventNode, 
record.getEvent(), mapper);
+        gen.writeTree(eventNode);
         gen.writeEndObject();
     }
+
+    private JsonNode buildEventNode(Event event, ObjectMapper mapper) {
+        if (event instanceof PythonEvent) {
+            return buildPythonEventNode((PythonEvent) event, mapper);
+        }
+        JsonNode eventNode = mapper.valueToTree(event);
+        if (eventNode.isObject()) {
+            ObjectNode objectNode = (ObjectNode) eventNode;
+            objectNode.put("eventType", event.getClass().getName());
+            objectNode.remove("sourceTimestamp");
+        }
+        return eventNode;
+    }
+
+    private JsonNode buildPythonEventNode(PythonEvent event, ObjectMapper 
mapper) {
+        String eventJsonStr = event.getEventJsonStr();
+        if (eventJsonStr != null) {
+            try {
+                JsonNode parsed = mapper.readTree(eventJsonStr);
+                if (parsed.isObject()) {
+                    ObjectNode objectNode = (ObjectNode) parsed;
+                    objectNode.remove("sourceTimestamp");
+                    return objectNode;
+                }
+                return parsed;
+            } catch (IOException ignored) {
+                // Fallback to raw eventJsonStr
+            }
+        }
+        ObjectNode fallback = mapper.createObjectNode();
+        if (event.getEventType() != null) {
+            fallback.put("eventType", event.getEventType());
+        }
+        fallback.put("id", event.getId().toString());
+        fallback.put("rawEventJsonStr", eventJsonStr);
+        return fallback;
+    }
+
+    private ObjectNode reorderEventFields(ObjectNode original, Event event, 
ObjectMapper mapper) {
+        ObjectNode ordered = mapper.createObjectNode();
+
+        JsonNode eventTypeNode = original.get("eventType");
+        if (eventTypeNode != null) {
+            ordered.set("eventType", eventTypeNode);
+        } else if (event instanceof PythonEvent) {
+            String eventType = ((PythonEvent) event).getEventType();
+            if (eventType != null) {
+                ordered.put("eventType", eventType);
+            }
+        } else {
+            ordered.put("eventType", event.getClass().getName());
+        }
+
+        JsonNode idNode = original.get("id");
+        if (idNode != null) {
+            ordered.set("id", idNode);
+        } else if (event.getId() != null) {
+            ordered.put("id", event.getId().toString());
+        }
+
+        JsonNode attributesNode = original.get("attributes");
+        if (attributesNode != null) {
+            ordered.set("attributes", attributesNode);
+        } else {
+            ordered.putObject("attributes");
+        }
+
+        Iterator<Map.Entry<String, JsonNode>> fields = original.fields();
+        while (fields.hasNext()) {
+            Map.Entry<String, JsonNode> entry = fields.next();
+            String fieldName = entry.getKey();
+            if ("sourceTimestamp".equals(fieldName)) {
+                continue;
+            }
+            if (!ordered.has(fieldName)) {
+                ordered.set(fieldName, entry.getValue());
+            }
+        }
+
+        return ordered;
+    }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
index ddabf503..3d37b05a 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
@@ -46,8 +46,8 @@ public class PythonRunnerContextImpl extends 
RunnerContextImpl {
         super.sendEvent(event);
     }
 
-    public void sendEvent(String type, byte[] event, String eventString) {
+    public void sendEvent(String type, byte[] event, String eventJsonStr) {
         // this method will be invoked by PythonActionExecutor's python 
interpreter.
-        sendEvent(new PythonEvent(event, type, eventString));
+        sendEvent(new PythonEvent(event, type, eventJsonStr));
     }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
index 8f5248d1..2abe7cb6 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
@@ -33,19 +33,19 @@ import java.util.UUID;
  * An event generated by the framework, passing a Python event to the Java 
agent runner.
  *
  * <p>This class stores Python objects as serialized byte arrays for 
processing, and also maintains
- * a human-readable string representation (eventString) for logging purposes. 
The eventString is
- * generated on the Python side when the event is created.
+ * a JSON string representation (eventJsonStr) for logging purposes. The 
eventJsonStr is generated
+ * on the Python side when the event is created.
  */
 public class PythonEvent extends Event {
     private final byte[] event;
     private final String eventType;
-    private final String eventString;
+    private final String eventJsonStr;
 
-    public PythonEvent(byte[] event, String eventType, String eventString) {
+    public PythonEvent(byte[] event, String eventType, String eventJsonStr) {
         super();
         this.event = event;
         this.eventType = eventType;
-        this.eventString = eventString;
+        this.eventJsonStr = eventJsonStr;
     }
 
     @JsonCreator
@@ -54,11 +54,11 @@ public class PythonEvent extends Event {
             @JsonProperty("attributes") Map<String, Object> attributes,
             @JsonProperty("event") byte[] event,
             @JsonProperty("eventType") String eventType,
-            @JsonProperty("eventString") String eventString) {
+            @JsonProperty("eventJsonStr") String eventJsonStr) {
         super(id, attributes);
         this.event = event;
         this.eventType = eventType;
-        this.eventString = eventString;
+        this.eventJsonStr = eventJsonStr;
     }
 
     @JsonIgnore // Don't serialize byte array in logs - used for processing 
only
@@ -71,15 +71,15 @@ public class PythonEvent extends Event {
     }
 
     /**
-     * Returns the human-readable string representation of this event.
+     * Returns the JSON string representation of this event.
      *
      * <p>This string is generated on the Python side when the event is 
created and is primarily
      * used for logging purposes.
      *
-     * @return the string representation of the event, or null if not available
+     * @return the JSON string representation of the event, or null if not 
available
      */
-    public String getEventString() {
-        return eventString;
+    public String getEventJsonStr() {
+        return eventJsonStr;
     }
 
     @Override
@@ -92,11 +92,11 @@ public class PythonEvent extends Event {
         PythonEvent other = (PythonEvent) o;
         return Arrays.equals(event, other.event)
                 && Objects.equals(this.eventType, other.eventType)
-                && Objects.equals(this.eventString, other.eventString);
+                && Objects.equals(this.eventJsonStr, other.eventJsonStr);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), Arrays.hashCode(event), 
eventType, eventString);
+        return Objects.hash(super.hashCode(), Arrays.hashCode(event), 
eventType, eventJsonStr);
     }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 55d38187..d47d8f58 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -146,8 +146,8 @@ public class PythonActionExecutor {
         checkState(result.getClass().isArray() && ((Object[]) result).length 
== 2);
         Object[] resultArray = (Object[]) result;
         byte[] eventBytes = (byte[]) resultArray[0];
-        String eventString = (String) resultArray[1];
-        return new PythonEvent(eventBytes, EventUtil.PYTHON_INPUT_EVENT_NAME, 
eventString);
+        String eventJsonStr = (String) resultArray[1];
+        return new PythonEvent(eventBytes, EventUtil.PYTHON_INPUT_EVENT_NAME, 
eventJsonStr);
     }
 
     public Object getOutputFromOutputEvent(byte[] pythonOutputEvent) {
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerdeTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerdeTest.java
index 6f695bd6..14a3aa87 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerdeTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerdeTest.java
@@ -56,20 +56,16 @@ class EventLogRecordJsonSerdeTest {
         JsonNode jsonNode = objectMapper.readTree(json);
 
         // Verify structure
-        assertTrue(jsonNode.has("context"));
+        assertTrue(jsonNode.has("timestamp"));
         assertTrue(jsonNode.has("event"));
 
-        // Verify context
-        JsonNode contextNode = jsonNode.get("context");
-        assertTrue(contextNode.has("eventType"));
-        assertTrue(contextNode.has("timestamp"));
-        assertEquals(
-                "org.apache.flink.agents.api.InputEvent", 
contextNode.get("eventType").asText());
-
         // Verify event
         JsonNode eventNode = jsonNode.get("event");
+        assertTrue(eventNode.has("eventType"));
         assertTrue(eventNode.has("input"));
         assertEquals("test input data", eventNode.get("input").asText());
+        assertEquals("org.apache.flink.agents.api.InputEvent", 
eventNode.get("eventType").asText());
+        assertFalse(eventNode.has("sourceTimestamp"));
     }
 
     @Test
@@ -86,7 +82,7 @@ class EventLogRecordJsonSerdeTest {
         JsonNode jsonNode = objectMapper.readTree(json);
         assertEquals(
                 "org.apache.flink.agents.api.OutputEvent",
-                jsonNode.get("context").get("eventType").asText());
+                jsonNode.get("event").get("eventType").asText());
         assertEquals("test output data", 
jsonNode.get("event").get("output").asText());
     }
 
@@ -104,7 +100,7 @@ class EventLogRecordJsonSerdeTest {
         JsonNode jsonNode = objectMapper.readTree(json);
         assertEquals(
                 
"org.apache.flink.agents.runtime.eventlog.EventLogRecordJsonSerdeTest$CustomTestEvent",
-                jsonNode.get("context").get("eventType").asText());
+                jsonNode.get("event").get("eventType").asText());
 
         JsonNode eventNode = jsonNode.get("event");
         assertEquals("custom data", eventNode.get("customData").asText());
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
index 1a77ccf0..1cd4d0ca 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
@@ -178,7 +178,7 @@ class FileEventLoggerTest {
         JsonNode jsonNode = objectMapper.readTree(lines.get(0));
         assertEquals(
                 
"org.apache.flink.agents.runtime.eventlog.FileEventLoggerTest$TestCustomEvent",
-                jsonNode.get("context").get("eventType").asText());
+                jsonNode.get("event").get("eventType").asText());
 
         JsonNode eventNode = jsonNode.get("event");
         assertEquals("custom data", eventNode.get("customData").asText());
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
index e3a21c57..597f1014 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
@@ -43,33 +43,36 @@ class PythonEventTest {
     }
 
     @Test
-    void testCreatePythonEventWithEventString() {
+    void testCreatePythonEventWithEventJsonStr() {
         // Given
         byte[] eventBytes = new byte[] {1, 2, 3, 4, 5};
         String eventType = "flink_agents.api.events.event.InputEvent";
-        String eventString = "InputEvent(input='test data')";
+        String eventJsonStr =
+                
"{\"eventType\":\"flink_agents.api.events.event.InputEvent\",\"input\":\"test 
data\"}";
 
         // When
-        PythonEvent event = new PythonEvent(eventBytes, eventType, 
eventString);
+        PythonEvent event = new PythonEvent(eventBytes, eventType, 
eventJsonStr);
 
         // Then
         assertThat(event.getEvent()).isEqualTo(eventBytes);
         assertThat(event.getEventType()).isEqualTo(eventType);
-        assertThat(event.getEventString()).isEqualTo(eventString);
+        assertThat(event.getEventJsonStr()).isEqualTo(eventJsonStr);
     }
 
     @Test
-    void testJsonSerializationWithEventString() throws Exception {
+    void testJsonSerializationWithEventJsonStr() throws Exception {
         // Given
         UUID expectedId = UUID.randomUUID();
         Map<String, Object> expectedAttributes = new HashMap<>();
         expectedAttributes.put("testKey", "testValue");
         byte[] eventBytes = "test_bytes".getBytes();
         String eventType = "flink_agents.api.events.event.OutputEvent";
-        String eventString = "OutputEvent(output={'key': 'value'})";
+        String eventJsonStr =
+                
"{\"eventType\":\"flink_agents.api.events.event.OutputEvent\",\"output\":{\"key\":\"value\"}}";
 
         PythonEvent event =
-                new PythonEvent(expectedId, expectedAttributes, eventBytes, 
eventType, eventString);
+                new PythonEvent(
+                        expectedId, expectedAttributes, eventBytes, eventType, 
eventJsonStr);
 
         // When
         String json = objectMapper.writeValueAsString(event);
@@ -78,27 +81,28 @@ class PythonEventTest {
         JsonNode jsonNode = objectMapper.readTree(json);
         assertThat(jsonNode.has("id")).isTrue();
         assertThat(jsonNode.has("eventType")).isTrue();
-        assertThat(jsonNode.has("eventString")).isTrue();
+        assertThat(jsonNode.has("eventJsonStr")).isTrue();
         assertThat(jsonNode.has("attributes")).isTrue();
         // event bytes should not be serialized
         assertThat(jsonNode.has("event")).isFalse();
         assertThat(jsonNode.get("eventType").asText()).isEqualTo(eventType);
-        
assertThat(jsonNode.get("eventString").asText()).isEqualTo(eventString);
+        
assertThat(jsonNode.get("eventJsonStr").asText()).isEqualTo(eventJsonStr);
         
assertThat(jsonNode.get("attributes").get("testKey").asText()).isEqualTo("testValue");
     }
 
     @Test
-    void testEventLogRecordSerializationWithEventString() throws Exception {
+    void testEventLogRecordSerializationWithEventJsonStr() throws Exception {
         // Given - simulate how PythonEvent is used in EventLogger
         UUID eventId = UUID.randomUUID();
         Map<String, Object> attributes = new HashMap<>();
         attributes.put("source", "python");
         byte[] eventBytes = "serialized_event".getBytes();
         String eventType = "flink_agents.api.events.event.InputEvent";
-        String eventString = "InputEvent(input={'key': 'value', 'count': 42})";
+        String eventJsonStr =
+                
"{\"eventType\":\"flink_agents.api.events.event.InputEvent\",\"input\":{\"key\":\"value\",\"count\":42}}";
 
         PythonEvent pythonEvent =
-                new PythonEvent(eventId, attributes, eventBytes, eventType, 
eventString);
+                new PythonEvent(eventId, attributes, eventBytes, eventType, 
eventJsonStr);
         pythonEvent.setSourceTimestamp(1234567890L);
 
         EventContext context = new EventContext(pythonEvent);
@@ -110,13 +114,7 @@ class PythonEventTest {
         // Then
         JsonNode jsonNode = objectMapper.readTree(json);
 
-        // Verify context contains PythonEvent type
-        assertThat(jsonNode.get("context").get("eventType").asText())
-                
.isEqualTo("org.apache.flink.agents.runtime.python.event.PythonEvent");
-
-        // Verify event contains human-readable eventString
         JsonNode eventNode = jsonNode.get("event");
-        
assertThat(eventNode.get("eventString").asText()).isEqualTo(eventString);
         assertThat(eventNode.get("eventType").asText()).isEqualTo(eventType);
         assertThat(eventNode.get("id").asText()).isEqualTo(eventId.toString());
         // Byte array should not be in the log


Reply via email to