This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit 117bc8f0854a498d4f9ee999ba958510f8c04b84 Author: sxnan <[email protected]> AuthorDate: Fri Jan 16 17:32:07 2026 +0800 [log] Add config to set the event log path # Conflicts: # runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java --- docs/content/docs/operations/configuration.md | 1 + docs/content/docs/operations/monitoring.md | 2 +- .../e2e_tests_integration/event_log_test.py | 100 +++++++++++++++++++++ .../runtime/operator/ActionExecutionOperator.java | 13 ++- .../operator/ActionExecutionOperatorTest.java | 47 +++++++++- 5 files changed, 160 insertions(+), 3 deletions(-) diff --git a/docs/content/docs/operations/configuration.md b/docs/content/docs/operations/configuration.md index 2acc1b9a..2ed6f768 100644 --- a/docs/content/docs/operations/configuration.md +++ b/docs/content/docs/operations/configuration.md @@ -191,6 +191,7 @@ Here are the configuration options for Agent execution. | Key | Default | Type | Description | |-------------------------------------|----------------------------|-----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `baseLogDir` | (none) | String | Base directory for file-based event logs. If not set, uses `java.io.tmpdir/flink-agents`. | | `error-handling-strategy` | ErrorHandlingStrategy.FAIL | ErrorHandlingStrategy | Strategy for handling errors during model requests, include timeout and unexpected output schema. <br/>The option value could be:<br/> <ul><li>`ErrorHandlingStrategy.FAIL`</li> <li>`ErrorHandlingStrategy.RETRY`</li> <li>`ErrorHandlingStrategy.IGNORE`</li> | | `max-retries` | 3 | int | Number of retries when using `ErrorHandlingStrategy.RETRY`. | diff --git a/docs/content/docs/operations/monitoring.md b/docs/content/docs/operations/monitoring.md index 462a7ba1..88e8fbfe 100644 --- a/docs/content/docs/operations/monitoring.md +++ b/docs/content/docs/operations/monitoring.md @@ -158,4 +158,4 @@ The log files follow a naming convention consistent with Flink's logging standar └── events-{jobId}-{taskName}-{subtaskId}.log ``` -By default, all File-based Event Logs are stored in the `flink-agents` subdirectory under the system temporary directory (`java.io.tmpdir`). In future versions, we plan to add a configurable parameter to allow users to customize the base log directory, providing greater control over log storage paths and lifecycle management. +By default, all File-based Event Logs are stored in the `flink-agents` subdirectory under the system temporary directory (`java.io.tmpdir`). You can override the base log directory with the `agent.baseLogDir` setting in Flink `config.yaml`. diff --git a/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py b/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py new file mode 100644 index 00000000..275f5a73 --- /dev/null +++ b/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py @@ -0,0 +1,100 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import json +import os +import sysconfig +from pathlib import Path + +from pyflink.common import Configuration, Encoder, WatermarkStrategy +from pyflink.common.typeinfo import Types +from pyflink.datastream import RuntimeExecutionMode, StreamExecutionEnvironment +from pyflink.datastream.connectors.file_system import ( + FileSource, + StreamFormat, + StreamingFileSink, +) + +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.e2e_tests.e2e_tests_integration.flink_integration_agent import ( + DataStreamAgent, + ItemData, + MyKeySelector, +) + +current_dir = Path(__file__).parent +os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] + + +def test_event_log_base_dir_flink(tmp_path: Path) -> None: # noqa: D103 + config = Configuration() + config.set_string("state.backend.type", "rocksdb") + config.set_string("checkpointing.interval", "1s") + config.set_string("restart-strategy.type", "disable") + env = StreamExecutionEnvironment.get_execution_environment(config) + env.set_runtime_mode(RuntimeExecutionMode.STREAMING) + env.set_parallelism(1) + + input_datastream = env.from_source( + source=FileSource.for_record_stream_format( + StreamFormat.text_line_format(), f"file:///{current_dir}/../resources/input" + ).build(), + watermark_strategy=WatermarkStrategy.no_watermarks(), + source_name="event_log_test_source", + ) + + deserialize_datastream = input_datastream.map( + lambda x: ItemData.model_validate_json(x) + ) + + agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + event_log_dir = tmp_path / "event_log" + agents_env.get_config().set_str("baseLogDir", str(event_log_dir)) + + output_datastream = ( + agents_env.from_datastream( + input=deserialize_datastream, key_selector=MyKeySelector() + ) + .apply(DataStreamAgent()) + .to_datastream() + ) + + result_dir = tmp_path / "results" + result_dir.mkdir(parents=True, exist_ok=True) + output_datastream.map(lambda x: x.model_dump_json(), Types.STRING()).add_sink( + StreamingFileSink.for_row_format( + base_path=str(result_dir.absolute()), + encoder=Encoder.simple_string_encoder(), + ).build() + ) + + agents_env.execute() + + event_logs = list(event_log_dir.glob("events-*.log")) + assert event_logs, "No event log files found in configured baseLogDir." + + first_log = event_logs[0] + record = None + with first_log.open("r", encoding="utf-8") as handle: + for line in handle: + if line.strip(): + record = json.loads(line) + break + + assert record is not None, "Event log file is empty." + assert "context" in record + assert "event" in record diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index 6a6ae880..41b31e69 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -45,6 +45,7 @@ import org.apache.flink.agents.runtime.context.JavaRunnerContextImpl; import org.apache.flink.agents.runtime.context.RunnerContextImpl; import org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment; import org.apache.flink.agents.runtime.env.PythonEnvironmentManager; +import org.apache.flink.agents.runtime.eventlog.FileEventLogger; import org.apache.flink.agents.runtime.memory.CachedMemoryStore; import org.apache.flink.agents.runtime.memory.MemoryObjectImpl; import org.apache.flink.agents.runtime.metrics.BuiltInMetrics; @@ -98,6 +99,7 @@ import java.util.Map; import java.util.Optional; import static org.apache.flink.agents.api.configuration.AgentConfigOptions.ACTION_STATE_STORE_BACKEND; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.BASE_LOG_DIR; import static org.apache.flink.agents.api.configuration.AgentConfigOptions.JOB_IDENTIFIER; import static org.apache.flink.agents.runtime.actionstate.ActionStateStore.BackendType.KAFKA; import static org.apache.flink.agents.runtime.utils.StateUtil.*; @@ -220,7 +222,7 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT this.inputIsJava = inputIsJava; this.processingTimeService = processingTimeService; this.mailboxExecutor = mailboxExecutor; - this.eventLogger = EventLoggerFactory.createLogger(EventLoggerConfig.builder().build()); + this.eventLogger = createEventLogger(agentPlan); this.eventListeners = new ArrayList<>(); this.actionStateStore = actionStateStore; this.checkpointIdToSeqNums = new HashMap<>(); @@ -1077,6 +1079,15 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT } } + private EventLogger createEventLogger(AgentPlan agentPlan) { + EventLoggerConfig.Builder loggerConfigBuilder = EventLoggerConfig.builder(); + String baseLogDir = agentPlan.getConfig().get(BASE_LOG_DIR); + if (baseLogDir != null && !baseLogDir.trim().isEmpty()) { + loggerConfigBuilder.property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, baseLogDir); + } + return EventLoggerFactory.createLogger(loggerConfigBuilder.build()); + } + /** Failed to execute Action task. */ public static class ActionTaskExecutionException extends Exception { public ActionTaskExecutionException(String message, Throwable cause) { diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java index f098acd8..f2709b32 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java @@ -20,14 +20,17 @@ package org.apache.flink.agents.runtime.operator; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.InputEvent; import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.configuration.AgentConfigOptions; import org.apache.flink.agents.api.context.DurableCallable; import org.apache.flink.agents.api.context.MemoryObject; import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.plan.AgentConfiguration; import org.apache.flink.agents.plan.AgentPlan; import org.apache.flink.agents.plan.JavaFunction; import org.apache.flink.agents.plan.actions.Action; import org.apache.flink.agents.runtime.actionstate.ActionState; import org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore; +import org.apache.flink.agents.runtime.eventlog.FileEventLogger; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.watermark.Watermark; @@ -226,6 +229,39 @@ public class ActionExecutionOperatorTest { } } + @Test + void testEventLogBaseDirFromAgentConfig() throws Exception { + String baseLogDir = "/tmp/flink-agents-test"; + AgentConfiguration config = new AgentConfiguration(); + config.set(AgentConfigOptions.BASE_LOG_DIR, baseLogDir); + AgentPlan agentPlan = TestAgent.getAgentPlanWithConfig(config); + + try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + new ActionExecutionOperatorFactory(agentPlan, true), + (KeySelector<Long, Long>) value -> value, + TypeInformation.of(Long.class))) { + testHarness.open(); + ActionExecutionOperator<Long, Object> operator = + (ActionExecutionOperator<Long, Object>) testHarness.getOperator(); + Field eventLoggerField = ActionExecutionOperator.class.getDeclaredField("eventLogger"); + eventLoggerField.setAccessible(true); + Object eventLogger = eventLoggerField.get(operator); + assertThat(eventLogger).isInstanceOf(FileEventLogger.class); + + Field configField = FileEventLogger.class.getDeclaredField("config"); + configField.setAccessible(true); + Object loggerConfig = configField.get(eventLogger); + Field propertiesField = loggerConfig.getClass().getDeclaredField("properties"); + propertiesField.setAccessible(true); + @SuppressWarnings("unchecked") + Map<String, Object> properties = + (Map<String, Object>) propertiesField.get(loggerConfig); + assertThat(properties.get(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY)) + .isEqualTo(baseLogDir); + } + } + @Test void testActionStateStoreContentVerification() throws Exception { AgentPlan agentPlanWithStateStore = TestAgent.getAgentPlan(false); @@ -955,6 +991,15 @@ public class ActionExecutionOperatorTest { } public static AgentPlan getAgentPlan(boolean testMemoryAccessOutOfMailbox) { + return getAgentPlanWithConfig(new AgentConfiguration(), testMemoryAccessOutOfMailbox); + } + + public static AgentPlan getAgentPlanWithConfig(AgentConfiguration config) { + return getAgentPlanWithConfig(config, false); + } + + private static AgentPlan getAgentPlanWithConfig( + AgentConfiguration config, boolean testMemoryAccessOutOfMailbox) { try { Map<String, List<Action>> actionsByEvent = new HashMap<>(); Action action1 = @@ -995,7 +1040,7 @@ public class ActionExecutionOperatorTest { actions.put(action3.getName(), action3); } - return new AgentPlan(actions, actionsByEvent, new HashMap<>()); + return new AgentPlan(actions, actionsByEvent, new HashMap<>(), config); } catch (Exception e) { ExceptionUtils.rethrow(e); }
