This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 2a4c858b [api] Defensive-copy collection inputs in user-constructed
events + add contract test (#677)
2a4c858b is described below
commit 2a4c858bf51c1b293c417a46db2362f2a8f32d2d
Author: Avichay Marciano <[email protected]>
AuthorDate: Sat May 16 15:55:05 2026 +0300
[api] Defensive-copy collection inputs in user-constructed events + add
contract test (#677)
Wrap the List arg in new ArrayList<>() in the constructors of the events
that user code instantiates directly: ChatRequestEvent (messages) and
ContextRetrievalResponseEvent (documents). This avoids the JDK 17+ Kryo
InaccessibleObjectException when callers pass List.of(...) or any other
ImmutableCollections instance.
ToolRequestEvent / ToolResponseEvent are intentionally not changed -
they are produced only by built-in framework actions, never by user code,
so adding defensive copies there only adds overhead. (And shallow copies
wouldn't help anyway: tool-call payloads can carry nested Map.of(...).)
EventKryoSerializationTest in the e2e integration module asserts the
defensive-copy contract directly: the stored collection must be mutable.
Two tests, one per affected event.
Co-authored-by: Avichay Marciano <[email protected]>
---
.../flink/agents/api/event/ChatRequestEvent.java | 2 +-
.../api/event/ContextRetrievalResponseEvent.java | 2 +-
.../test/EventKryoSerializationTest.java | 83 ++++++++++++++++++++++
3 files changed, 85 insertions(+), 2 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java
b/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java
index 9c1d2e3c..b1cdcdbc 100644
--- a/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java
+++ b/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java
@@ -42,7 +42,7 @@ public class ChatRequestEvent extends Event {
String model, List<ChatMessage> messages, @Nullable Object
outputSchema) {
super(EVENT_TYPE);
setAttr("model", model);
- setAttr("messages", messages);
+ setAttr("messages", new ArrayList<>(messages));
if (outputSchema != null) {
setAttr("output_schema", outputSchema);
}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java
b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java
index 2a52a5d1..35861ce2 100644
---
a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java
+++
b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java
@@ -40,7 +40,7 @@ public class ContextRetrievalResponseEvent extends Event {
super(EVENT_TYPE);
setAttr("request_id", requestId);
setAttr("query", query);
- setAttr("documents", documents);
+ setAttr("documents", new ArrayList<>(documents));
}
public ContextRetrievalResponseEvent(UUID id, Map<String, Object>
attributes) {
diff --git
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/EventKryoSerializationTest.java
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/EventKryoSerializationTest.java
new file mode 100644
index 00000000..8e48d061
--- /dev/null
+++
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/EventKryoSerializationTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.integration.test;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/**
+ * Verifies that the user-constructed Event classes which carry
collection-typed payloads remain
+ * Kryo-friendly when constructed with immutable collections (e.g.
List.of(...)).
+ *
+ * <p>On JDK 17+, Flink's Kryo serializer cannot reflectively access internal
fields of {@code
+ * java.util.ImmutableCollections} subclasses without {@code --add-opens
+ * java.base/java.util=ALL-UNNAMED}. The fix is to defensive-copy the input
collection at event
+ * construction time so the stored collection is always a plain {@link
java.util.ArrayList}, which
+ * Kryo handles natively.
+ *
+ * <p>Scope: this test only covers {@link ChatRequestEvent} and {@link
+ * ContextRetrievalResponseEvent}, which are constructed directly by user
code. {@code
+ * ToolRequestEvent} and {@code ToolResponseEvent} are intentionally excluded
because they are
+ * constructed only by built-in framework actions, not by users, so defensive
copies on those events
+ * would only add overhead without protecting any real caller.
+ *
+ * <p>This test asserts the contract directly: the stored collection must be
mutable, i.e. {@code
+ * add()} must not throw {@link UnsupportedOperationException}, which is
exactly what would happen
+ * if the immutable input were stored by reference.
+ */
+public class EventKryoSerializationTest {
+
+ @Test
+ void chatRequestEventDefensiveCopiesImmutableList() {
+ ChatRequestEvent event =
+ new ChatRequestEvent(
+ "myModel", List.of(new ChatMessage(MessageRole.USER,
"hello")));
+
+ assertDoesNotThrow(
+ () -> event.getMessages().add(new
ChatMessage(MessageRole.USER, "world")),
+ "ChatRequestEvent must defensive-copy its messages list");
+ }
+
+ @Test
+ void contextRetrievalResponseEventDefensiveCopiesImmutableList() {
+ UUID requestId = UUID.randomUUID();
+ ContextRetrievalResponseEvent event =
+ new ContextRetrievalResponseEvent(
+ requestId,
+ "what is flink agents?",
+ List.of(
+ new Document(
+ "Apache Flink Agents is a streaming
agent framework.",
+ Map.of(),
+ "doc-1")));
+
+ assertDoesNotThrow(
+ () -> event.getDocuments().add(new Document("extra", Map.of(),
"doc-2")),
+ "ContextRetrievalResponseEvent must defensive-copy its
documents list");
+ }
+}