This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 64fe0021 [feature] Support pretty-printed JSON format for FileEvent
logs option (#572)
64fe0021 is described below
commit 64fe002188d7f0bb1b152a497d0ed3e2fbfc9ce5
Author: Eugene <[email protected]>
AuthorDate: Tue Mar 17 17:02:18 2026 +0800
[feature] Support pretty-printed JSON format for FileEvent logs option
(#572)
---
.../api/configuration/AgentConfigOptions.java | 7 +++++
docs/content/docs/operations/configuration.md | 2 ++
docs/content/docs/operations/monitoring.md | 4 ++-
.../agents/runtime/eventlog/FileEventLogger.java | 10 ++++++-
.../runtime/operator/ActionExecutionOperator.java | 3 ++
.../runtime/eventlog/FileEventLoggerTest.java | 33 ++++++++++++++++++++++
.../operator/ActionExecutionOperatorTest.java | 2 ++
7 files changed, 59 insertions(+), 2 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index e4871c66..72424745 100644
---
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -24,6 +24,13 @@ public class AgentConfigOptions {
public static final ConfigOption<String> BASE_LOG_DIR =
new ConfigOption<>("baseLogDir", String.class, null);
+ /**
+ * The config parameter determines whether to enable pretty-printed JSON
format for FileEvent
+ * logs.
+ */
+ public static final ConfigOption<Boolean> PRETTY_PRINT =
+ new ConfigOption<>("prettyPrint", Boolean.class, false);
+
/** The config parameter specifies the backend for action state store. */
public static final ConfigOption<String> ACTION_STATE_STORE_BACKEND =
new ConfigOption<>("actionStateStoreBackend", String.class, null);
diff --git a/docs/content/docs/operations/configuration.md
b/docs/content/docs/operations/configuration.md
index 556a0e6a..4460b548 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -123,9 +123,11 @@ In the following two cases, Flink Agents may not locate
the corresponding config
### Core Options
Here is the list of all built-in core configuration options.
+
| Key | Default | Type
| Description
|
|---------------------------|----------------------------|-----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `baseLogDir` | (none) | String
| Base directory for file-based event logs. If not set, uses
`java.io.tmpdir/flink-agents`.
|
+| `prettyPrint` | false | boolean
| Whether to enable pretty-printed JSON format for event logs. When set to
`true`, each event is written as formatted multi-line JSON instead of JSONL
(JSON Lines) format. {{< hint info >}}Note: enabling this option makes the log
file no longer valid JSONL format. {{< /hint >}} |
| `error-handling-strategy` | ErrorHandlingStrategy.FAIL |
ErrorHandlingStrategy | Strategy for handling errors during model requests,
include timeout and unexpected output schema. <br/>The option value could
be:<br/> <ul><li>`ErrorHandlingStrategy.FAIL`</li>
<li>`ErrorHandlingStrategy.RETRY`</li> <li>`ErrorHandlingStrategy.IGNORE`</li> |
| `max-retries` | 3 | int
| Number of retries when using `ErrorHandlingStrategy.RETRY`.
|
| `chat.async` | true | boolean
| Whether chat asynchronously for built-in chat action.
|
diff --git a/docs/content/docs/operations/monitoring.md
b/docs/content/docs/operations/monitoring.md
index 88e8fbfe..83a92d55 100644
--- a/docs/content/docs/operations/monitoring.md
+++ b/docs/content/docs/operations/monitoring.md
@@ -145,7 +145,9 @@ Currently, the system supports **File-based Event Log** as
the default implement
### File Event Log
-The **File Event Log** is a file-based event logging system that stores events
in structured files within a flat directory. Each event is recorded in **JSON
Lines (JSONL)** format, with one JSON object per line.
+The **File Event Log** is a file-based event logging system that stores events
in structured files within a flat directory.
+
+By default, each event is recorded in **JSON Lines (JSONL)** format, with one
JSON object per line. When [`prettyPrint`]({{< ref
"docs/operations/configuration#core-options" >}}) is enabled, each event is
written as formatted multi-line JSON instead, and the log file is no longer in
valid JSONL format.
#### File Structure
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
index a4d0ed6b..8e781727 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
@@ -76,6 +76,7 @@ import java.nio.file.Paths;
*/
public class FileEventLogger implements EventLogger {
public static final String BASE_LOG_DIR_PROPERTY_KEY = "baseLogDir";
+ public static final String PRETTY_PRINT_PROPERTY_KEY = "prettyPrint";
// The default base log directory if not specified in the configuration
private static final String DEFAULT_BASE_LOG_DIR =
Paths.get(System.getProperty("java.io.tmpdir"),
"flink-agents").toString();
@@ -84,6 +85,7 @@ public class FileEventLogger implements EventLogger {
private final EventLoggerConfig config;
private final EventFilter eventFilter;
+ private boolean prettyPrint;
private PrintWriter writer;
public FileEventLogger(EventLoggerConfig config) {
@@ -101,6 +103,8 @@ public class FileEventLogger implements EventLogger {
}
// Create writer in append mode
writer = new PrintWriter(new BufferedWriter(new
FileWriter(logFilePath, true)));
+ prettyPrint =
+ (Boolean)
config.getProperties().getOrDefault(PRETTY_PRINT_PROPERTY_KEY, false);
}
private String generateSubTaskLogFilePath(EventLoggerOpenParams params) {
@@ -134,7 +138,11 @@ public class FileEventLogger implements EventLogger {
EventLogRecord record = new EventLogRecord(context, event);
// All events should be JSON serializable, since we check it when
sending events to context:
// RunnerContextImpl.sendEvent
- writer.println(MAPPER.writeValueAsString(record));
+ String json =
+ prettyPrint
+ ?
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(record)
+ : MAPPER.writeValueAsString(record);
+ writer.println(json);
}
@Override
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index f4e58efe..68480440 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -101,6 +101,7 @@ import java.util.Optional;
import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.ACTION_STATE_STORE_BACKEND;
import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.BASE_LOG_DIR;
import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.JOB_IDENTIFIER;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.PRETTY_PRINT;
import static
org.apache.flink.agents.runtime.actionstate.ActionStateStore.BackendType.KAFKA;
import static org.apache.flink.agents.runtime.utils.StateUtil.*;
import static org.apache.flink.util.Preconditions.checkState;
@@ -1110,6 +1111,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
if (baseLogDir != null && !baseLogDir.trim().isEmpty()) {
loggerConfigBuilder.property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
baseLogDir);
}
+ loggerConfigBuilder.property(
+ FileEventLogger.PRETTY_PRINT_PROPERTY_KEY,
agentPlan.getConfig().get(PRETTY_PRINT));
return EventLoggerFactory.createLogger(loggerConfigBuilder.build());
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
index 1cd4d0ca..b61d8045 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.agents.api.EventContext;
import org.apache.flink.agents.api.EventFilter;
import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
import org.apache.flink.agents.api.logger.EventLoggerConfig;
import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
import org.apache.flink.api.common.JobID;
@@ -494,6 +495,38 @@ class FileEventLoggerTest {
assertInstanceOf(OutputEvent.class, outputRecord.getEvent());
}
+ @Test
+ void testPrettyPrintOutputsFormattedJson() throws Exception {
+ // Given - config with prettyPrint enabled
+ config =
+ EventLoggerConfig.builder()
+ .loggerType("file")
+ .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
+ .property(AgentConfigOptions.PRETTY_PRINT.getKey(),
true)
+ .build();
+ logger = new FileEventLogger(config);
+
+ logger.open(openParams);
+ InputEvent inputEvent = new InputEvent("test input");
+ logger.append(new EventContext(inputEvent), inputEvent);
+ logger.flush();
+
+ // Then - output should be valid JSON spanning multiple lines
(pretty-printed)
+ Path logFile = getExpectedLogFilePath();
+ List<String> lines = Files.readAllLines(logFile);
+ // Pretty-printed JSON for a single event record spans multiple lines
+ assertTrue(lines.size() > 1, "Pretty-printed JSON should span multiple
lines");
+ // Each line after the first should be indented
+ assertTrue(
+ lines.subList(1, lines.size()).stream().anyMatch(line ->
line.startsWith(" ")),
+ "Pretty-printed JSON lines should be indented");
+ // The entire content should still be valid JSON
+ String content = String.join("\n", lines);
+ assertDoesNotThrow(
+ () -> objectMapper.readValue(content, EventLogRecord.class),
+ "Pretty-printed output should be valid JSON deserializable to
EventLogRecord");
+ }
+
private Path getExpectedLogFilePath() {
return tempDir.resolve(
String.format(
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index 646d1e63..e1254441 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -234,6 +234,7 @@ public class ActionExecutionOperatorTest {
String baseLogDir = "/tmp/flink-agents-test";
AgentConfiguration config = new AgentConfiguration();
config.set(AgentConfigOptions.BASE_LOG_DIR, baseLogDir);
+ config.set(AgentConfigOptions.PRETTY_PRINT, true);
AgentPlan agentPlan = TestAgent.getAgentPlanWithConfig(config);
try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object>
testHarness =
@@ -259,6 +260,7 @@ public class ActionExecutionOperatorTest {
(Map<String, Object>) propertiesField.get(loggerConfig);
assertThat(properties.get(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY))
.isEqualTo(baseLogDir);
+
assertThat(properties.get(FileEventLogger.PRETTY_PRINT_PROPERTY_KEY)).isEqualTo(true);
}
}