This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 2cb4fb03 [Feature][Java] Add java event listeners config (#641)
2cb4fb03 is described below
commit 2cb4fb038465c2dea792da61f209b3ac80b01dcf
Author: twosom <[email protected]>
AuthorDate: Fri May 15 17:10:22 2026 +0900
[Feature][Java] Add java event listeners config (#641)
Co-authored-by: hope <[email protected]>
---
.../api/configuration/AgentConfigOptions.java | 7 +++
.../flink/agents/api/listener/EventListener.java | 29 ++++++------
docs/content/docs/operations/configuration.md | 4 ++
.../runtime/operator/ActionExecutionOperator.java | 3 ++
.../flink/agents/runtime/operator/EventRouter.java | 31 +++++++++++++
.../operator/ActionExecutionOperatorTest.java | 51 ++++++++++++++++++++++
6 files changed, 112 insertions(+), 13 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index 608b7f78..c39997da 100644
---
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -20,6 +20,8 @@ package org.apache.flink.agents.api.configuration;
import org.apache.flink.agents.api.logger.EventLogLevel;
import org.apache.flink.agents.api.logger.LoggerType;
+import java.util.List;
+
/** The set of configuration options for agents parameters. */
public class AgentConfigOptions {
@@ -135,4 +137,9 @@ public class AgentConfigOptions {
*/
public static final ConfigOption<Integer> EVENT_LOG_MAX_DEPTH =
new ConfigOption<>("event-log.standard.max-depth", Integer.class,
5);
+
+ /** The config parameter specifies the list of event listener class names.
*/
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static final ConfigOption<List<String>> EVENT_LISTENERS =
+ (ConfigOption) new ConfigOption<>("event-listeners", List.class,
null);
}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/listener/EventListener.java
b/api/src/main/java/org/apache/flink/agents/api/listener/EventListener.java
index 3668fcf6..6c5b9fb2 100644
--- a/api/src/main/java/org/apache/flink/agents/api/listener/EventListener.java
+++ b/api/src/main/java/org/apache/flink/agents/api/listener/EventListener.java
@@ -22,30 +22,33 @@ import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.EventContext;
/**
- * Interface for event listeners that are notified when events are processed.
+ * Interface for event listeners that are notified when events are received
for processing.
*
- * <p>EventListener provides a callback mechanism triggered after event
processing completes. This
- * is useful for monitoring, metrics collection, debugging, or triggering side
effects based on
- * event processing.
+ * <p>EventListener provides a callback mechanism triggered at the beginning
of event processing.
+ * This is useful for monitoring, metrics collection, debugging, or triggering
side effects based on
+ * event reception.
*
- * <p>Event listeners are executed synchronously after the main event
processing is complete but
- * before the next event is processed. Implementations should be lightweight
and avoid blocking
- * operations to prevent impacting agent performance.
+ * <p>Event listeners are executed synchronously when an event is received,
before any actions are
+ * triggered. Implementations should be lightweight and avoid blocking
operations to prevent
+ * impacting agent performance.
+ *
+ * <p><strong>Note:</strong> Implementing classes must provide a public
no-argument constructor to
+ * allow for dynamic instantiation by the agent.
*/
public interface EventListener {
/**
- * Called when an event has been processed.
+ * Called when an event is being processed.
*
- * <p>This method is invoked after the event has been processed by the
agent's actions. The
- * listener can inspect the event and its context to perform additional
processing such as
- * logging, metrics collection, or triggering external notifications.
+ * <p>This method is invoked when an event is received by the agent,
before it is processed by
+ * any actions. The listener can inspect the event and its context to
perform additional
+ * processing such as logging, metrics collection, or triggering external
notifications.
*
* <p><strong>Important:</strong> This method should not throw exceptions
as they will be caught
* and logged but will not affect the main event processing flow.
Implementations should handle
* their own error recovery.
*
- * @param context The context associated with the event processing
- * @param event The event that was processed
+ * @param context The context associated with the event
+ * @param event The event that is being processed
*/
void onEventProcessed(EventContext context, Event event);
}
diff --git a/docs/content/docs/operations/configuration.md
b/docs/content/docs/operations/configuration.md
index 82e08f7a..ac9fab55 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -72,6 +72,9 @@ Configuration config = agentsEnv.getConfig();
// Set custom configuration using key (direct string key)
config.setInt("kafkaActionStateTopicNumPartitions", 128); // Kafka topic
partitions count
+// Set the list of event listeners
+config.set(AgentConfigOptions.EVENT_LISTENERS,
List.of(MyCustomListener.class.getName()));
+
// Set framework configuration using ConfigOption (predefined option class)
config.set(AgentExecutionOptions.ERROR_HANDLING_STRATEGY,
ErrorHandlingStrategy.RETRY);
```
@@ -129,6 +132,7 @@ Here is the list of all built-in core configuration options.
| `eventLoggerType` | `SLF4J` | LoggerType
| Which built-in event logger to use. Valid values: `SLF4J` (writes JSON
through a dedicated SLF4J logger so events show up in Flink's Web UI **Logs**
tab) and `FILE` (writes per-subtask `.log` files under `baseLogDir`). Setting
`baseLogDir` overrides this and forces `FILE`. |
| `baseLogDir` | (none) | String
| Base directory for file-based event logs. If not set, uses
`java.io.tmpdir/flink-agents`. Setting this value also implicitly switches
`eventLoggerType` to `file`.
|
| `prettyPrint` | false | boolean
| Whether to enable pretty-printed JSON format for event logs. When set to
`true`, each event is written as formatted multi-line JSON instead of JSONL
(JSON Lines) format. {{< hint info >}}Note: enabling this option makes the log
file no longer valid JSONL format. {{< /hint >}} |
+| `event-listeners` | none | `List<String>`
| The list of event listener class names. Each class must implement the
EventListener interface and provide a public no-argument constructor. {{< hint
warning >}} Note: Currently, custom event listeners are only supported in Java.
{{< /hint >}} |
| `error-handling-strategy` | ErrorHandlingStrategy.FAIL |
ErrorHandlingStrategy | Strategy for handling errors during model requests,
include timeout and unexpected output schema. <br/>The option value could
be:<br/> <ul><li>`ErrorHandlingStrategy.FAIL`</li>
<li>`ErrorHandlingStrategy.RETRY`</li> <li>`ErrorHandlingStrategy.IGNORE`</li> |
| `max-retries` | 3 | int
| Number of retries when using `ErrorHandlingStrategy.RETRY`.
|
| `retry-wait-interval` | 1 | int
| Base wait interval in seconds between retries when using
`ErrorHandlingStrategy.RETRY`. Uses exponential backoff: the actual wait time
for the Nth retry is `retry-wait-interval * 2^(N-1)` seconds. For example, with
default 1s, waits are 1s, 2s, 4s, etc. Retry count and total wait time are
reported in `ChatResponseEvent` and recorded as metrics (`retryCount`,
`retryWaitSec`) under the connection name. |
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 719db92d..957798f8 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -182,6 +182,9 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
// Initialize the event logger if it is set.
eventRouter.initEventLogger(getRuntimeContext());
+ // Initialize user event listeners from configuration
+ eventRouter.initEventListeners(getRuntimeContext());
+
// Since an operator restart may change the key range it manages due
to changes in
// parallelism,
// and {@link tryProcessActionTaskForKey} mails might be lost,
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
index 1283c4c6..bc2aef6f 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
@@ -41,6 +41,8 @@ import
org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
import javax.annotation.Nullable;
@@ -48,6 +50,7 @@ import java.util.ArrayList;
import java.util.List;
import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.BASE_LOG_DIR;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.EVENT_LISTENERS;
import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.EVENT_LOGGER_TYPE;
import static org.apache.flink.util.Preconditions.checkState;
@@ -83,6 +86,7 @@ class EventRouter<IN, OUT> implements AutoCloseable {
private final boolean inputIsJava;
private final EventLogger eventLogger;
private final List<EventListener> eventListeners;
+ private final AgentPlan agentPlan;
private StreamRecord<OUT> reusedStreamRecord;
private SegmentedQueue keySegmentQueue;
private BuiltInMetrics builtInMetrics;
@@ -93,6 +97,7 @@ class EventRouter<IN, OUT> implements AutoCloseable {
@VisibleForTesting
EventRouter(AgentPlan agentPlan, boolean inputIsJava, EventLogger
eventLogger) {
+ this.agentPlan = agentPlan;
this.inputIsJava = inputIsJava;
this.eventLogger = eventLogger;
this.eventListeners = new ArrayList<>();
@@ -127,6 +132,32 @@ class EventRouter<IN, OUT> implements AutoCloseable {
}
}
+ /**
+ * Initializes the {@link EventListener}s configured for this agent.
+ *
+ * @throws RuntimeException if any listener class fails to instantiate.
+ */
+ void initEventListeners(StreamingRuntimeContext runtimeContext) {
+ final List<String> eventListenerClassList =
agentPlan.getConfig().get(EVENT_LISTENERS);
+ if (eventListenerClassList == null ||
eventListenerClassList.isEmpty()) {
+ return;
+ }
+
+ final ClassLoader userCodeClassLoader =
runtimeContext.getUserCodeClassLoader();
+ final List<EventListener> eventListeners = new ArrayList<>();
+ for (String listenerClassName : eventListenerClassList) {
+ try {
+ eventListeners.add(
+ InstantiationUtil.instantiate(
+ listenerClassName, EventListener.class,
userCodeClassLoader));
+ } catch (FlinkException e) {
+ throw new RuntimeException(
+ "Failed to instantiate EventListener: " +
listenerClassName, e);
+ }
+ }
+ this.eventListeners.addAll(eventListeners);
+ }
+
/**
* Wraps an incoming record into an {@link Event} suitable for action
dispatch.
*
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index 2b2f5a29..41cb5109 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -19,12 +19,14 @@ package org.apache.flink.agents.runtime.operator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.EventContext;
import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.api.configuration.AgentConfigOptions;
import org.apache.flink.agents.api.context.DurableCallable;
import org.apache.flink.agents.api.context.MemoryObject;
import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.listener.EventListener;
import org.apache.flink.agents.api.logger.EventLoggerConfig;
import org.apache.flink.agents.api.logger.LoggerType;
import org.apache.flink.agents.plan.AgentConfiguration;
@@ -308,6 +310,55 @@ public class ActionExecutionOperatorTest {
}
}
+ /** A EventListener for unit test */
+ public static class TestEventListener implements EventListener {
+ public boolean called = false;
+
+ @Override
+ public void onEventProcessed(EventContext context, Event event) {
+ this.called = true;
+ }
+ }
+
+ @Test
+ void testEventListenersFromAgentConfig() throws Exception {
+ final AgentConfiguration config = new AgentConfiguration();
+ config.set(AgentConfigOptions.EVENT_LISTENERS,
List.of(TestEventListener.class.getName()));
+ final AgentPlan agentPlan = TestAgent.getAgentPlanWithConfig(config);
+
+ try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object>
testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new ActionExecutionOperatorFactory(agentPlan, true),
+ (KeySelector<Long, Long>) value -> value,
+ TypeInformation.of(Long.class))) {
+ testHarness.open();
+ final ActionExecutionOperator<Long, Object> operator =
+ (ActionExecutionOperator<Long, Object>)
testHarness.getOperator();
+ final Field eventListenersField =
EventRouter.class.getDeclaredField("eventListeners");
+ eventListenersField.setAccessible(true);
+ final Object obj =
eventListenersField.get(operator.getEventRouter());
+ assertThat(obj).isNotNull();
+ assertThat(obj).isInstanceOf(List.class);
+
+ final List eventListeners = (List) obj;
+ assertThat(eventListeners.size()).isEqualTo(1);
+
+ final Object listener = eventListeners.get(0);
+ assertThat(listener).isInstanceOf(TestEventListener.class);
+
+ // listener should not have been triggered yet
+ boolean called = ((TestEventListener) listener).called;
+ assertThat(called).isFalse();
+
+ // process a some element to trigger the operator logic
+ testHarness.processElement(new StreamRecord<>(1L));
+
+ // listener should have been invoked after element processing
+ called = ((TestEventListener) listener).called;
+ assertThat(called).isTrue();
+ }
+ }
+
@Test
void testDoesNotPruneBeforeCheckpointComplete() throws Exception {
AgentPlan agentPlanWithStateStore = TestAgent.getAgentPlan(false);