This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new db9bad9 [hotfix][runtime] EventLogger displays the content of
PythonEvent as string (#367)
db9bad9 is described below
commit db9bad9e35d40b938f8def2f43ceb03c27a47df2
Author: Xuannan <[email protected]>
AuthorDate: Tue Dec 2 16:01:37 2025 +0800
[hotfix][runtime] EventLogger displays the content of PythonEvent as string
(#367)
* [hotfix][runtime] EventLogger displays the content of PythonEvent as
string
---
.../e2e_tests/python_event_logging_test.py | 123 ++++++++++++++++++++
.../flink_agents/runtime/flink_runner_context.py | 4 +-
python/flink_agents/runtime/python_java_utils.py | 11 +-
.../python/context/PythonRunnerContextImpl.java | 4 +-
.../agents/runtime/python/event/PythonEvent.java | 36 +++++-
.../runtime/python/utils/PythonActionExecutor.java | 10 +-
.../runtime/python/event/PythonEventTest.java | 125 +++++++++++++++++++++
7 files changed, 299 insertions(+), 14 deletions(-)
diff --git a/python/flink_agents/e2e_tests/python_event_logging_test.py
b/python/flink_agents/e2e_tests/python_event_logging_test.py
new file mode 100644
index 0000000..3486fa5
--- /dev/null
+++ b/python/flink_agents/e2e_tests/python_event_logging_test.py
@@ -0,0 +1,123 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+import shutil
+import tempfile
+from pathlib import Path
+
+from pyflink.common import Configuration, WatermarkStrategy
+from pyflink.datastream import (
+ KeySelector,
+ RuntimeExecutionMode,
+ StreamExecutionEnvironment,
+)
+from pyflink.datastream.connectors.file_system import (
+ FileSource,
+ StreamFormat,
+)
+
+from flink_agents.api.agent import Agent
+from flink_agents.api.decorators import action
+from flink_agents.api.events.event import InputEvent, OutputEvent
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.api.runner_context import RunnerContext
+
+
+class InputKeySelector(KeySelector):
+ """Key selector for input data."""
+
+ def get_key(self, value: dict) -> int:
+ """Extract key from input data."""
+ return value.get("id", 0)
+
+
+class PythonEventLoggingAgent(Agent):
+ """Agent for testing PythonEvent logging."""
+
+ @action(InputEvent)
+ @staticmethod
+ def process_input(event: InputEvent, ctx: RunnerContext) -> None:
+ """Process input event and send a PythonEvent."""
+ # Send a PythonEvent that should be logged with readable content
+ input_data = event.input
+ ctx.send_event(
+ OutputEvent(output={"processed_review": f"{input_data['review']}"})
+ )
+
+
+def test_python_event_logging() -> None:
+ """Test that PythonEvent can be logged with readable content."""
+ # Check that log files were created in the default location
+ default_log_dir = Path(tempfile.gettempdir()) / "flink-agents"
+ shutil.rmtree(default_log_dir, ignore_errors=True)
+
+ config = Configuration()
+ env = StreamExecutionEnvironment.get_execution_environment(config)
+ env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+ env.set_parallelism(1)
+
+ # Create agent environment
+ agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+
+ # Set up input source
+ current_dir = Path(__file__).parent
+ input_datastream = env.from_source(
+ source=FileSource.for_record_stream_format(
+ StreamFormat.text_line_format(),
+ f"file:///{current_dir}/resources/input/input_data.txt",
+ ).build(),
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="python_event_logging_test",
+ )
+
+ # Parse input
+ deserialize_datastream = input_datastream.map(lambda x: json.loads(x))
+
+ # Apply agent
+ agents_env.from_datastream(
+ input=deserialize_datastream, key_selector=InputKeySelector()
+ ).apply(PythonEventLoggingAgent()).to_datastream()
+
+ # Execute the job
+ agents_env.execute()
+
+ # Also check our custom log directory
+ log_files = []
+ if default_log_dir.exists():
+ log_files.extend(default_log_dir.glob("events-*.log"))
+
+ # At least one log file should exist
+ assert len(log_files) > 0, (
+ f"Event log files should be created in {default_log_dir}"
+ )
+
+ # Check that log files contain readable PythonEvent content
+ log_content = ""
+ for log_file in log_files:
+ with log_file.open() as f:
+ log_content += f.read()
+
+ print(log_content)
+
+ # Verify log contains expected content - should have readable event data
via
+ # eventString
+ assert "processed_review" in log_content, (
+ "Log should contain processed event content from eventString"
+ )
+ assert "eventString" in log_content, "Log should contain eventString field"
+ assert "eventType" in log_content, "Log should contain event type
information"
diff --git a/python/flink_agents/runtime/flink_runner_context.py
b/python/flink_agents/runtime/flink_runner_context.py
index c226d18..56f2ebb 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -64,8 +64,10 @@ class FlinkRunnerContext(RunnerContext):
The event to be processed by the agent system.
"""
class_path =
f"{event.__class__.__module__}.{event.__class__.__qualname__}"
+ event_bytes = cloudpickle.dumps(event)
+ event_string = str(event)
try:
- self._j_runner_context.sendEvent(class_path,
cloudpickle.dumps(event))
+ self._j_runner_context.sendEvent(class_path, event_bytes,
event_string)
except Exception as e:
err_msg = "Failed to send event " + class_path + " to runner
context"
raise RuntimeError(err_msg) from e
diff --git a/python/flink_agents/runtime/python_java_utils.py
b/python/flink_agents/runtime/python_java_utils.py
index 4190c55..586e6cb 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/python_java_utils.py
@@ -27,9 +27,14 @@ def convert_to_python_object(bytesObject: bytes) -> Any:
return cloudpickle.loads(bytesObject)
-def wrap_to_input_event(bytesObject: bytes) -> bytes:
- """Wrap data to python input event and serialize."""
- return cloudpickle.dumps(InputEvent(input=cloudpickle.loads(bytesObject)))
+def wrap_to_input_event(bytesObject: bytes) -> tuple[bytes, str]:
+ """Wrap data to python input event and serialize.
+
+ Returns:
+ A tuple of (serialized_event_bytes, event_string_representation)
+ """
+ event = InputEvent(input=cloudpickle.loads(bytesObject))
+ return (cloudpickle.dumps(event), str(event))
def get_output_from_output_event(bytesObject: bytes) -> Any:
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
index 4bdb8d8..26d5a79 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
@@ -58,9 +58,9 @@ public class PythonRunnerContextImpl extends
RunnerContextImpl {
super.sendEvent(event);
}
- public void sendEvent(String type, byte[] event) {
+ public void sendEvent(String type, byte[] event, String eventString) {
// this method will be invoked by PythonActionExecutor's python
interpreter.
- sendEvent(new PythonEvent(event, type));
+ sendEvent(new PythonEvent(event, type, eventString));
}
public PythonActionExecutor getPythonActionExecutor() {
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
index 1a99e97..225e3e9 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
@@ -21,6 +21,7 @@ package org.apache.flink.agents.runtime.python.event;
import org.apache.flink.agents.api.Event;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Arrays;
@@ -28,15 +29,23 @@ import java.util.Map;
import java.util.Objects;
import java.util.UUID;
-/** An event generated by the framework, passing a Python event to the Java
agent runner. */
+/**
+ * An event generated by the framework, passing a Python event to the Java
agent runner.
+ *
+ * <p>This class stores Python objects as serialized byte arrays for
processing, and also maintains
+ * a human-readable string representation (eventString) for logging purposes.
The eventString is
+ * generated on the Python side when the event is created.
+ */
public class PythonEvent extends Event {
private final byte[] event;
private final String eventType;
+ private final String eventString;
- public PythonEvent(byte[] event, String eventType) {
+ public PythonEvent(byte[] event, String eventType, String eventString) {
super();
this.event = event;
this.eventType = eventType;
+ this.eventString = eventString;
}
@JsonCreator
@@ -44,12 +53,15 @@ public class PythonEvent extends Event {
@JsonProperty("id") UUID id,
@JsonProperty("attributes") Map<String, Object> attributes,
@JsonProperty("event") byte[] event,
- @JsonProperty("eventType") String eventType) {
+ @JsonProperty("eventType") String eventType,
+ @JsonProperty("eventString") String eventString) {
super(id, attributes);
this.event = event;
this.eventType = eventType;
+ this.eventString = eventString;
}
+ @JsonIgnore // Don't serialize byte array in logs - used for processing
only
public byte[] getEvent() {
return event;
}
@@ -58,6 +70,18 @@ public class PythonEvent extends Event {
return eventType;
}
+ /**
+ * Returns the human-readable string representation of this event.
+ *
+ * <p>This string is generated on the Python side when the event is
created and is primarily
+ * used for logging purposes.
+ *
+ * @return the string representation of the event, or null if not available
+ */
+ public String getEventString() {
+ return eventString;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -66,11 +90,13 @@ public class PythonEvent extends Event {
return false;
}
PythonEvent other = (PythonEvent) o;
- return Arrays.equals(event, other.event) &&
Objects.equals(this.eventType, other.eventType);
+ return Arrays.equals(event, other.event)
+ && Objects.equals(this.eventType, other.eventType)
+ && Objects.equals(this.eventString, other.eventString);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), Arrays.hashCode(event),
eventType);
+ return Objects.hash(super.hashCode(), Arrays.hashCode(event),
eventType, eventString);
}
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 014d450..ded0140 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -124,9 +124,13 @@ public class PythonActionExecutor {
public PythonEvent wrapToInputEvent(Object eventData) {
checkState(eventData instanceof byte[]);
- return new PythonEvent(
- (byte[]) interpreter.invoke(WRAP_TO_INPUT_EVENT, eventData),
- EventUtil.PYTHON_INPUT_EVENT_NAME);
+ // wrap_to_input_event returns a tuple of (bytes, str)
+ Object result = interpreter.invoke(WRAP_TO_INPUT_EVENT, eventData);
+ checkState(result.getClass().isArray() && ((Object[]) result).length
== 2);
+ Object[] resultArray = (Object[]) result;
+ byte[] eventBytes = (byte[]) resultArray[0];
+ String eventString = (String) resultArray[1];
+ return new PythonEvent(eventBytes, EventUtil.PYTHON_INPUT_EVENT_NAME,
eventString);
}
public Object getOutputFromOutputEvent(byte[] pythonOutputEvent) {
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
new file mode 100644
index 0000000..600583e
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.agents.runtime.python.event;
+
+import org.apache.flink.agents.api.EventContext;
+import org.apache.flink.agents.runtime.eventlog.EventLogRecord;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link PythonEvent}. */
+class PythonEventTest {
+
+ private ObjectMapper objectMapper;
+
+ @BeforeEach
+ void setUp() {
+ objectMapper = new ObjectMapper();
+ }
+
+ @Test
+ void testCreatePythonEventWithEventString() {
+ // Given
+ byte[] eventBytes = new byte[] {1, 2, 3, 4, 5};
+ String eventType = "flink_agents.api.events.event.InputEvent";
+ String eventString = "InputEvent(input='test data')";
+
+ // When
+ PythonEvent event = new PythonEvent(eventBytes, eventType,
eventString);
+
+ // Then
+ assertThat(event.getEvent()).isEqualTo(eventBytes);
+ assertThat(event.getEventType()).isEqualTo(eventType);
+ assertThat(event.getEventString()).isEqualTo(eventString);
+ }
+
+ @Test
+ void testJsonSerializationWithEventString() throws Exception {
+ // Given
+ UUID expectedId = UUID.randomUUID();
+ Map<String, Object> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("testKey", "testValue");
+ byte[] eventBytes = "test_bytes".getBytes();
+ String eventType = "flink_agents.api.events.event.OutputEvent";
+ String eventString = "OutputEvent(output={'key': 'value'})";
+
+ PythonEvent event =
+ new PythonEvent(expectedId, expectedAttributes, eventBytes,
eventType, eventString);
+
+ // When
+ String json = objectMapper.writeValueAsString(event);
+
+ // Then
+ JsonNode jsonNode = objectMapper.readTree(json);
+ assertThat(jsonNode.has("id")).isTrue();
+ assertThat(jsonNode.has("eventType")).isTrue();
+ assertThat(jsonNode.has("eventString")).isTrue();
+ assertThat(jsonNode.has("attributes")).isTrue();
+ // event bytes should not be serialized
+ assertThat(jsonNode.has("event")).isFalse();
+ assertThat(jsonNode.get("eventType").asText()).isEqualTo(eventType);
+
assertThat(jsonNode.get("eventString").asText()).isEqualTo(eventString);
+
assertThat(jsonNode.get("attributes").get("testKey").asText()).isEqualTo("testValue");
+ }
+
+ @Test
+ void testEventLogRecordSerializationWithEventString() throws Exception {
+ // Given - simulate how PythonEvent is used in EventLogger
+ UUID eventId = UUID.randomUUID();
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("source", "python");
+ byte[] eventBytes = "serialized_event".getBytes();
+ String eventType = "flink_agents.api.events.event.InputEvent";
+ String eventString = "InputEvent(input={'key': 'value', 'count': 42})";
+
+ PythonEvent pythonEvent =
+ new PythonEvent(eventId, attributes, eventBytes, eventType,
eventString);
+ pythonEvent.setSourceTimestamp(1234567890L);
+
+ EventContext context = new EventContext(pythonEvent);
+ EventLogRecord record = new EventLogRecord(context, pythonEvent);
+
+ // When
+ String json = objectMapper.writeValueAsString(record);
+
+ // Then
+ JsonNode jsonNode = objectMapper.readTree(json);
+
+ // Verify context contains PythonEvent type
+ assertThat(jsonNode.get("context").get("eventType").asText())
+
.isEqualTo("org.apache.flink.agents.runtime.python.event.PythonEvent");
+
+ // Verify event contains human-readable eventString
+ JsonNode eventNode = jsonNode.get("event");
+
assertThat(eventNode.get("eventString").asText()).isEqualTo(eventString);
+ assertThat(eventNode.get("eventType").asText()).isEqualTo(eventType);
+ assertThat(eventNode.get("id").asText()).isEqualTo(eventId.toString());
+ // Byte array should not be in the log
+ assertThat(eventNode.has("event")).isFalse();
+ }
+}