joeyutong commented on code in PR #628:
URL: https://github.com/apache/flink-agents/pull/628#discussion_r3151827392
##########
docs/content/docs/operations/configuration.md:
##########
@@ -140,14 +140,35 @@ Here is the list of all built-in core configuration
options.
### Action State Store
+#### Common
+
+| Key | Default | Type | Description
|
+|------------------------------|------------------|---------|------------------------------------------------------------------------------------------|
+| `actionStateStoreBackend` | (none) | String | The backend for
action state store. Supported values: `"kafka"`, `"fluss"`. |
+
#### Kafka-based Action State Store
Here are the configuration options for Kafka-based Action State Store.
| Key | Default | Type |
Description |
|-------------------------------------|--------------------------|---------|-----------------------------------------------------------------------------|
-| `actionStateStoreBackend` | (none) | String |
The config parameter specifies the backend for action state store. |
| `kafkaBootstrapServers` | "localhost:9092" | String |
The config parameter specifies the Kafka bootstrap server. |
| `kafkaActionStateTopic` | (none) | String |
The config parameter specifies the Kafka topic for action state. |
| `kafkaActionStateTopicNumPartitions`| 64 | Integer |
The config parameter specifies the number of partitions for the Kafka action
state topic. |
| `kafkaActionStateTopicReplicationFactor` | 1 | Integer |
The config parameter specifies the replication factor for the Kafka action
state topic. |
+
+#### Fluss-based Action State Store
+
+Here are the configuration options for Fluss-based Action State Store.
+
+| Key | Default | Type | Description
|
+|------------------------------|------------------|---------|------------------------------------------------------------------------------------------|
+| `flussBootstrapServers` | "localhost:9123" | String | The Fluss
bootstrap servers address. |
+| `flussActionStateDatabase` | "flink_agents" | String | The Fluss
database name for storing action state. |
+| `flussActionStateTable` | "action_state" | String | The Fluss table
name for storing action state. |
Review Comment:
```suggestion
| `flussActionStateTable` | "action_states" | String | The Fluss
table name for storing action state. |
```
##########
api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java:
##########
@@ -51,6 +51,44 @@ public class AgentConfigOptions {
public static final ConfigOption<Integer>
KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR =
new ConfigOption<>("kafkaActionStateTopicReplicationFactor",
Integer.class, 1);
+ /** The config parameter specifies the Fluss bootstrap servers. */
+ public static final ConfigOption<String> FLUSS_BOOTSTRAP_SERVERS =
+ new ConfigOption<>("flussBootstrapServers", String.class,
"localhost:9123");
+
+ /** The config parameter specifies the Fluss database for action state. */
+ public static final ConfigOption<String> FLUSS_ACTION_STATE_DATABASE =
+ new ConfigOption<>("flussActionStateDatabase", String.class,
"flink_agents");
+
+ /** The config parameter specifies the Fluss table name for action state.
*/
+ public static final ConfigOption<String> FLUSS_ACTION_STATE_TABLE =
+ new ConfigOption<>("flussActionStateTable", String.class,
"action_state");
Review Comment:
```suggestion
new ConfigOption<>("flussActionStateTable", String.class,
"action_states");
```
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ * <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ * <li><b>Write</b>: Append to log + update in-memory HashMap.
+ * <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if
none) to rebuild
+ * in-memory state.
+ * <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a
log/append-only table):
+ *
+ * <pre>
+ * state_key STRING -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES -- ActionState JSON serialized bytes
+ * agent_key STRING -- Original key; used as bucket key so all records
for the same
+ * agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlussActionStateStore.class);
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ /**
+ * Number of consecutive empty polls before considering the log fully
consumed during recovery.
+ */
+ private static final int EMPTY_POLL_THRESHOLD = 3;
+
+ private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+ /** Creates and configures the ObjectMapper for ActionState serialization.
*/
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+ return mapper;
+ }
+
+ /** Mixin to add type information for Event hierarchy. */
+ @JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "@class")
+ abstract static class EventTypeInfoMixin {}
+
+ // Column indices in the Fluss table schema
+ private static final int COL_STATE_KEY = 0;
+ private static final int COL_STATE_PAYLOAD = 1;
+ private static final int COL_AGENT_KEY = 2;
+ private static final int NUM_COLUMNS = 3;
+
+ private final AgentConfiguration agentConfiguration;
+ private final String databaseName;
+ private final String tableName;
+ private final TablePath tablePath;
+
+ private final Connection connection;
+ private final Table table;
+ private final AppendWriter writer;
+
+ /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on
recovery. */
+ private final Map<String, ActionState> actionStates;
+
+ public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+ this.agentConfiguration = agentConfiguration;
+ this.databaseName =
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+ this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+ this.tablePath = TablePath.of(databaseName, tableName);
+ this.actionStates = new HashMap<>();
+
+ Configuration flussConf = new Configuration();
+ flussConf.setString("bootstrap.servers",
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+ String securityProtocol =
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+ if (securityProtocol != null) {
+ flussConf.setString("client.security.protocol", securityProtocol);
+ }
+ String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+ if (saslMechanism != null) {
+ flussConf.setString("client.security.sasl.mechanism",
saslMechanism);
+ }
+ String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+ if (jaasConfig != null) {
+ flussConf.setString("client.security.sasl.jaas.config",
jaasConfig);
+ }
+ String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+ if (username != null) {
+ flussConf.setString("client.security.sasl.username", username);
+ }
+ String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+ if (password != null) {
+ flussConf.setString("client.security.sasl.password", password);
+ }
+
+ this.connection = ConnectionFactory.createConnection(flussConf);
+ maybeCreateDatabaseAndTable();
+ this.table = connection.getTable(tablePath);
+ this.writer = table.newAppend().createWriter();
+
+ LOG.info(
+ "Initialized FlussActionStateStore (log table) with table:
{}.{}",
+ databaseName,
+ tableName);
+ }
+
+ @Override
+ public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
+ throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+ GenericRow row = new GenericRow(NUM_COLUMNS);
+ row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+ row.setField(COL_STATE_PAYLOAD, payload);
+ row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+ // Append to Fluss log for durability, then update in-memory cache.
+ // Synchronous write ensures the record is durable before returning.
+ writer.append(row).get();
+ actionStates.put(stateKey, state);
+
+ LOG.debug("Stored action state: key={}, isCompleted={}", stateKey,
state.isCompleted());
+ }
+
+ @Override
+ public ActionState get(Object key, long seqNum, Action action, Event
event) throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ ActionState state = actionStates.get(stateKey);
+ LOG.debug("Lookup action state: key={}, found={}", stateKey, state !=
null);
+ return state;
+ }
+
+ /**
+ * Rebuilds in-memory state by scanning the Fluss log table. If recovery
markers are provided,
+ * computes the minimum offset per bucket across all markers and
subscribes from those offsets.
+ * Otherwise, subscribes from the beginning. Reads until the log is
exhausted ({@link
+ * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key
appearing multiple
+ * times in the log, the latest record wins (last-write-wins).
+ *
+ * <p>Note: Unlike the Kafka implementation which skips rebuild on empty
markers, this
+ * implementation scans from the beginning when no markers are provided,
enabling full state
+ * recovery even without prior checkpoints.
+ */
+ @Override
+ public void rebuildState(List<Object> recoveryMarkers) {
+ LOG.info(
+ "Rebuilding action state from Fluss log table with {} recovery
markers",
+ recoveryMarkers.size());
+ actionStates.clear();
+
+ int numBuckets = table.getTableInfo().getNumBuckets();
+
+ // Compute per-bucket start offsets from recovery markers
+ Map<Integer, Long> bucketOffsets = new HashMap<>();
+ for (Object marker : recoveryMarkers) {
+ if (marker instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+ for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+ bucketOffsets.merge(entry.getKey(), entry.getValue(),
Math::min);
+ }
+ } else if (marker != null) {
+ LOG.warn(
+ "Ignoring unrecognized recovery marker type: {}",
+ marker.getClass().getName());
+ }
+ }
+
+ try (LogScanner scanner = table.newScan().createLogScanner()) {
+ if (bucketOffsets.isEmpty()) {
+ for (int b = 0; b < numBuckets; b++) {
+ scanner.subscribeFromBeginning(b);
+ }
+ } else {
+ for (Map.Entry<Integer, Long> entry :
bucketOffsets.entrySet()) {
+ scanner.subscribe(entry.getKey(), entry.getValue());
+ }
+ }
+
+ int emptyPollCount = 0;
+ while (emptyPollCount < EMPTY_POLL_THRESHOLD) {
+ ScanRecords records = scanner.poll(POLL_TIMEOUT);
+ if (records.isEmpty()) {
+ emptyPollCount++;
+ } else {
+ emptyPollCount = 0;
+ for (TableBucket bucket : records.buckets()) {
+ for (ScanRecord record : records.records(bucket)) {
+ InternalRow row = record.getRow();
+ String stateKey =
row.getString(COL_STATE_KEY).toString();
+ byte[] payload = row.getBytes(COL_STATE_PAYLOAD);
+ try {
+ ActionState state =
+ OBJECT_MAPPER.readValue(payload,
ActionState.class);
+ actionStates.put(stateKey, state);
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to deserialize action state
for key: {}, skipping",
+ stateKey,
+ e);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to rebuild state from Fluss log
table", e);
+ }
+
+ LOG.info("Completed rebuilding state, recovered {} states",
actionStates.size());
+ }
+
+ /**
+ * Returns the end offsets of each bucket as a recovery marker. Similar to
Kafka's
+ * implementation, this captures the current log position so that {@link
#rebuildState} can
+ * resume from these offsets instead of scanning from the beginning.
+ */
+ @Override
+ public Object getRecoveryMarker() {
+ int numBuckets = table.getTableInfo().getNumBuckets();
+
+ try (Admin admin = connection.getAdmin()) {
+ List<Integer> buckets = new ArrayList<>();
+ for (int b = 0; b < numBuckets; b++) {
+ buckets.add(b);
+ }
+ return new HashMap<>(
+ admin.listOffsets(tablePath, buckets, new
OffsetSpec.LatestSpec()).all().get());
+ } catch (Exception e) {
+ LOG.error("Failed to get end offsets for Fluss table: {}",
tablePath, e);
+ throw new RuntimeException("Failed to get end offsets for Fluss
table", e);
+ }
+ }
+
+ /**
+ * Evicts pruned states from the in-memory cache. The Fluss log is
append-only; physical cleanup
+ * relies on Fluss log retention configuration.
+ */
+ @Override
+ public void pruneState(Object key, long seqNum) {
+ LOG.debug("Pruning in-memory state for key: {} up to seqNum: {}", key,
seqNum);
+
+ actionStates
+ .entrySet()
+ .removeIf(
+ entry -> {
+ String stateKey = entry.getKey();
+ if (stateKey.startsWith(key.toString() + "_")) {
+ try {
+ List<String> parts =
ActionStateUtil.parseKey(stateKey);
+ if (parts.size() >= 2) {
+ long stateSeqNum =
Long.parseLong(parts.get(1));
+ return stateSeqNum <= seqNum;
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn("Failed to parse seqNum from
state key: {}", stateKey);
+ }
+ }
+ return false;
+ });
+
+ LOG.debug("Pruned in-memory state for key: {} up to seqNum: {}", key,
seqNum);
+ }
+
+ @Override
+ public void close() throws Exception {
+ Exception firstException = null;
+
+ try {
+ if (writer != null) {
+ writer.flush();
+ }
+ } catch (Exception e) {
+ firstException = e;
+ }
+
+ try {
+ if (table != null) {
+ table.close();
+ }
+ } catch (Exception e) {
+ if (firstException == null) {
+ firstException = e;
+ }
+ }
+
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (Exception e) {
+ if (firstException == null) {
+ firstException = e;
+ }
+ }
+
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+
+ private void maybeCreateDatabaseAndTable() {
+ try (Admin admin = connection.getAdmin()) {
+ if (!admin.databaseExists(databaseName).get()) {
+ admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY,
true).get();
+ LOG.info("Created Fluss database: {}", databaseName);
+ }
+
+ if (!admin.tableExists(tablePath).get()) {
+ // No primaryKey() call — this creates an append-only log
table in Fluss.
+ Schema schema =
+ Schema.newBuilder()
Review Comment:
Could we also define constants for the column names?
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ * <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ * <li><b>Write</b>: Append to log + update in-memory HashMap.
+ * <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if
none) to rebuild
+ * in-memory state.
+ * <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a
log/append-only table):
+ *
+ * <pre>
+ * state_key STRING -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES -- ActionState JSON serialized bytes
+ * agent_key STRING -- Original key; used as bucket key so all records
for the same
+ * agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlussActionStateStore.class);
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ /**
+ * Number of consecutive empty polls before considering the log fully
consumed during recovery.
+ */
+ private static final int EMPTY_POLL_THRESHOLD = 3;
+
+ private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+ /** Creates and configures the ObjectMapper for ActionState serialization.
*/
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+ return mapper;
+ }
+
+ /** Mixin to add type information for Event hierarchy. */
+ @JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "@class")
+ abstract static class EventTypeInfoMixin {}
+
+ // Column indices in the Fluss table schema
+ private static final int COL_STATE_KEY = 0;
+ private static final int COL_STATE_PAYLOAD = 1;
+ private static final int COL_AGENT_KEY = 2;
+ private static final int NUM_COLUMNS = 3;
+
+ private final AgentConfiguration agentConfiguration;
+ private final String databaseName;
+ private final String tableName;
+ private final TablePath tablePath;
+
+ private final Connection connection;
+ private final Table table;
+ private final AppendWriter writer;
+
+ /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on
recovery. */
+ private final Map<String, ActionState> actionStates;
+
+ public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+ this.agentConfiguration = agentConfiguration;
+ this.databaseName =
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+ this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+ this.tablePath = TablePath.of(databaseName, tableName);
+ this.actionStates = new HashMap<>();
+
+ Configuration flussConf = new Configuration();
+ flussConf.setString("bootstrap.servers",
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+ String securityProtocol =
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+ if (securityProtocol != null) {
+ flussConf.setString("client.security.protocol", securityProtocol);
+ }
+ String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+ if (saslMechanism != null) {
+ flussConf.setString("client.security.sasl.mechanism",
saslMechanism);
+ }
+ String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+ if (jaasConfig != null) {
+ flussConf.setString("client.security.sasl.jaas.config",
jaasConfig);
+ }
+ String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+ if (username != null) {
+ flussConf.setString("client.security.sasl.username", username);
+ }
+ String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+ if (password != null) {
+ flussConf.setString("client.security.sasl.password", password);
+ }
+
+ this.connection = ConnectionFactory.createConnection(flussConf);
+ maybeCreateDatabaseAndTable();
+ this.table = connection.getTable(tablePath);
+ this.writer = table.newAppend().createWriter();
+
+ LOG.info(
+ "Initialized FlussActionStateStore (log table) with table:
{}.{}",
+ databaseName,
+ tableName);
+ }
+
+ @Override
+ public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
+ throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+ GenericRow row = new GenericRow(NUM_COLUMNS);
+ row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+ row.setField(COL_STATE_PAYLOAD, payload);
+ row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+ // Append to Fluss log for durability, then update in-memory cache.
+ // Synchronous write ensures the record is durable before returning.
+ writer.append(row).get();
+ actionStates.put(stateKey, state);
+
+ LOG.debug("Stored action state: key={}, isCompleted={}", stateKey,
state.isCompleted());
+ }
+
+ @Override
+ public ActionState get(Object key, long seqNum, Action action, Event
event) throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ ActionState state = actionStates.get(stateKey);
+ LOG.debug("Lookup action state: key={}, found={}", stateKey, state !=
null);
+ return state;
+ }
+
+ /**
+ * Rebuilds in-memory state by scanning the Fluss log table. If recovery
markers are provided,
+ * computes the minimum offset per bucket across all markers and
subscribes from those offsets.
+ * Otherwise, subscribes from the beginning. Reads until the log is
exhausted ({@link
+ * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key
appearing multiple
+ * times in the log, the latest record wins (last-write-wins).
+ *
+ * <p>Note: Unlike the Kafka implementation which skips rebuild on empty
markers, this
+ * implementation scans from the beginning when no markers are provided,
enabling full state
+ * recovery even without prior checkpoints.
+ */
+ @Override
+ public void rebuildState(List<Object> recoveryMarkers) {
+ LOG.info(
+ "Rebuilding action state from Fluss log table with {} recovery
markers",
+ recoveryMarkers.size());
+ actionStates.clear();
+
+ int numBuckets = table.getTableInfo().getNumBuckets();
+
+ // Compute per-bucket start offsets from recovery markers
+ Map<Integer, Long> bucketOffsets = new HashMap<>();
+ for (Object marker : recoveryMarkers) {
+ if (marker instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+ for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+ bucketOffsets.merge(entry.getKey(), entry.getValue(),
Math::min);
+ }
+ } else if (marker != null) {
+ LOG.warn(
+ "Ignoring unrecognized recovery marker type: {}",
+ marker.getClass().getName());
+ }
+ }
+
+ try (LogScanner scanner = table.newScan().createLogScanner()) {
+ if (bucketOffsets.isEmpty()) {
+ for (int b = 0; b < numBuckets; b++) {
+ scanner.subscribeFromBeginning(b);
+ }
+ } else {
+ for (Map.Entry<Integer, Long> entry :
bucketOffsets.entrySet()) {
+ scanner.subscribe(entry.getKey(), entry.getValue());
+ }
+ }
+
+ int emptyPollCount = 0;
+ while (emptyPollCount < EMPTY_POLL_THRESHOLD) {
+ ScanRecords records = scanner.poll(POLL_TIMEOUT);
+ if (records.isEmpty()) {
+ emptyPollCount++;
+ } else {
+ emptyPollCount = 0;
+ for (TableBucket bucket : records.buckets()) {
+ for (ScanRecord record : records.records(bucket)) {
+ InternalRow row = record.getRow();
+ String stateKey =
row.getString(COL_STATE_KEY).toString();
+ byte[] payload = row.getBytes(COL_STATE_PAYLOAD);
+ try {
+ ActionState state =
+ OBJECT_MAPPER.readValue(payload,
ActionState.class);
+ actionStates.put(stateKey, state);
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to deserialize action state
for key: {}, skipping",
+ stateKey,
+ e);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to rebuild state from Fluss log
table", e);
+ }
+
+ LOG.info("Completed rebuilding state, recovered {} states",
actionStates.size());
+ }
+
+ /**
+ * Returns the end offsets of each bucket as a recovery marker. Similar to
Kafka's
+ * implementation, this captures the current log position so that {@link
#rebuildState} can
+ * resume from these offsets instead of scanning from the beginning.
+ */
+ @Override
+ public Object getRecoveryMarker() {
+ int numBuckets = table.getTableInfo().getNumBuckets();
+
+ try (Admin admin = connection.getAdmin()) {
+ List<Integer> buckets = new ArrayList<>();
+ for (int b = 0; b < numBuckets; b++) {
+ buckets.add(b);
+ }
+ return new HashMap<>(
Review Comment:
Nit: do we need the extra `new HashMap<>(...)` here? Since the method
returns `Object`, the map from `listOffsets(...).all().get()` can be returned
directly unless a defensive copy is intentional.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
Review Comment:
Could we simplify this class-level comment to focus on the current contract
instead of the implementation history? The `HashMap` sentence and the “Compared
to the previous KV-table implementation” section feel a bit redundant. Also,
`agent_key` is the original input key, so “same agent” should probably be “same
input key”.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ * <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ * <li><b>Write</b>: Append to log + update in-memory HashMap.
+ * <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if
none) to rebuild
+ * in-memory state.
+ * <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a
log/append-only table):
+ *
+ * <pre>
+ * state_key STRING -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES -- ActionState JSON serialized bytes
+ * agent_key STRING -- Original key; used as bucket key so all records
for the same
+ * agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlussActionStateStore.class);
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
Review Comment:
Could we reuse the existing `ActionStateKafkaSeder` serde logic here?
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ * <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ * <li><b>Write</b>: Append to log + update in-memory HashMap.
+ * <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if
none) to rebuild
+ * in-memory state.
+ * <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a
log/append-only table):
+ *
+ * <pre>
+ * state_key STRING -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES -- ActionState JSON serialized bytes
+ * agent_key STRING -- Original key; used as bucket key so all records
for the same
+ * agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlussActionStateStore.class);
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ /**
+ * Number of consecutive empty polls before considering the log fully
consumed during recovery.
+ */
+ private static final int EMPTY_POLL_THRESHOLD = 3;
+
+ private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+ /** Creates and configures the ObjectMapper for ActionState serialization.
*/
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+ return mapper;
+ }
+
+ /** Mixin to add type information for Event hierarchy. */
+ @JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "@class")
+ abstract static class EventTypeInfoMixin {}
+
+ // Column indices in the Fluss table schema
+ private static final int COL_STATE_KEY = 0;
+ private static final int COL_STATE_PAYLOAD = 1;
+ private static final int COL_AGENT_KEY = 2;
+ private static final int NUM_COLUMNS = 3;
+
+ private final AgentConfiguration agentConfiguration;
+ private final String databaseName;
+ private final String tableName;
+ private final TablePath tablePath;
+
+ private final Connection connection;
+ private final Table table;
+ private final AppendWriter writer;
+
+ /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on
recovery. */
+ private final Map<String, ActionState> actionStates;
+
+ public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+ this.agentConfiguration = agentConfiguration;
+ this.databaseName =
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+ this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+ this.tablePath = TablePath.of(databaseName, tableName);
+ this.actionStates = new HashMap<>();
+
+ Configuration flussConf = new Configuration();
+ flussConf.setString("bootstrap.servers",
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+ String securityProtocol =
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+ if (securityProtocol != null) {
+ flussConf.setString("client.security.protocol", securityProtocol);
+ }
+ String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+ if (saslMechanism != null) {
+ flussConf.setString("client.security.sasl.mechanism",
saslMechanism);
+ }
+ String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+ if (jaasConfig != null) {
+ flussConf.setString("client.security.sasl.jaas.config",
jaasConfig);
+ }
+ String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+ if (username != null) {
+ flussConf.setString("client.security.sasl.username", username);
+ }
+ String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+ if (password != null) {
+ flussConf.setString("client.security.sasl.password", password);
+ }
+
+ this.connection = ConnectionFactory.createConnection(flussConf);
+ maybeCreateDatabaseAndTable();
+ this.table = connection.getTable(tablePath);
+ this.writer = table.newAppend().createWriter();
+
+ LOG.info(
+ "Initialized FlussActionStateStore (log table) with table:
{}.{}",
+ databaseName,
+ tableName);
+ }
+
+ @Override
+ public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
+ throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+ GenericRow row = new GenericRow(NUM_COLUMNS);
+ row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+ row.setField(COL_STATE_PAYLOAD, payload);
+ row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+ // Append to Fluss log for durability, then update in-memory cache.
+ // Synchronous write ensures the record is durable before returning.
+ writer.append(row).get();
+ actionStates.put(stateKey, state);
+
+ LOG.debug("Stored action state: key={}, isCompleted={}", stateKey,
state.isCompleted());
+ }
+
+ @Override
+ public ActionState get(Object key, long seqNum, Action action, Event
event) throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
Review Comment:
Could we also handle divergence in `get()`? `KafkaActionStateStore#get()`
calls `checkDivergence(key, seqNum)` and removes cached states beyond the
requested sequence when divergence is detected. The Fluss backend currently
only does a direct map lookup, which makes the two backends behave differently
during divergent replay.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ * <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ * <li><b>Write</b>: Append to log + update in-memory HashMap.
+ * <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if
none) to rebuild
+ * in-memory state.
+ * <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a
log/append-only table):
+ *
+ * <pre>
+ * state_key STRING -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES -- ActionState JSON serialized bytes
+ * agent_key STRING -- Original key; used as bucket key so all records
for the same
+ * agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlussActionStateStore.class);
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ /**
+ * Number of consecutive empty polls before considering the log fully
consumed during recovery.
+ */
+ private static final int EMPTY_POLL_THRESHOLD = 3;
+
+ private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+ /** Creates and configures the ObjectMapper for ActionState serialization.
*/
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+ return mapper;
+ }
+
+ /** Mixin to add type information for Event hierarchy. */
+ @JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "@class")
+ abstract static class EventTypeInfoMixin {}
+
+ // Column indices in the Fluss table schema
+ private static final int COL_STATE_KEY = 0;
+ private static final int COL_STATE_PAYLOAD = 1;
+ private static final int COL_AGENT_KEY = 2;
+ private static final int NUM_COLUMNS = 3;
+
+ private final AgentConfiguration agentConfiguration;
+ private final String databaseName;
+ private final String tableName;
+ private final TablePath tablePath;
+
+ private final Connection connection;
+ private final Table table;
+ private final AppendWriter writer;
+
+ /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on
recovery. */
+ private final Map<String, ActionState> actionStates;
+
+ public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+ this.agentConfiguration = agentConfiguration;
+ this.databaseName =
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+ this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+ this.tablePath = TablePath.of(databaseName, tableName);
+ this.actionStates = new HashMap<>();
+
+ Configuration flussConf = new Configuration();
+ flussConf.setString("bootstrap.servers",
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+ String securityProtocol =
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+ if (securityProtocol != null) {
+ flussConf.setString("client.security.protocol", securityProtocol);
+ }
+ String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+ if (saslMechanism != null) {
+ flussConf.setString("client.security.sasl.mechanism",
saslMechanism);
+ }
+ String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+ if (jaasConfig != null) {
+ flussConf.setString("client.security.sasl.jaas.config",
jaasConfig);
+ }
+ String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+ if (username != null) {
+ flussConf.setString("client.security.sasl.username", username);
+ }
+ String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+ if (password != null) {
+ flussConf.setString("client.security.sasl.password", password);
+ }
+
+ this.connection = ConnectionFactory.createConnection(flussConf);
+ maybeCreateDatabaseAndTable();
+ this.table = connection.getTable(tablePath);
+ this.writer = table.newAppend().createWriter();
+
+ LOG.info(
+ "Initialized FlussActionStateStore (log table) with table:
{}.{}",
+ databaseName,
+ tableName);
+ }
+
+ @Override
+ public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
+ throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+ GenericRow row = new GenericRow(NUM_COLUMNS);
+ row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+ row.setField(COL_STATE_PAYLOAD, payload);
+ row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+ // Append to Fluss log for durability, then update in-memory cache.
+ // Synchronous write ensures the record is durable before returning.
+ writer.append(row).get();
+ actionStates.put(stateKey, state);
+
+ LOG.debug("Stored action state: key={}, isCompleted={}", stateKey,
state.isCompleted());
+ }
+
+ @Override
+ public ActionState get(Object key, long seqNum, Action action, Event
event) throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ ActionState state = actionStates.get(stateKey);
+ LOG.debug("Lookup action state: key={}, found={}", stateKey, state !=
null);
+ return state;
+ }
+
+ /**
+ * Rebuilds in-memory state by scanning the Fluss log table. If recovery
markers are provided,
+ * computes the minimum offset per bucket across all markers and
subscribes from those offsets.
+ * Otherwise, subscribes from the beginning. Reads until the log is
exhausted ({@link
+ * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key
appearing multiple
+ * times in the log, the latest record wins (last-write-wins).
+ *
+ * <p>Note: Unlike the Kafka implementation which skips rebuild on empty
markers, this
Review Comment:
Should we keep this behavior aligned with the Kafka backend? When
`recoveryMarkers` or the derived bucket offsets are empty, Kafka skips rebuild
because there is no checkpointed external-store position to recover from.
Scanning the Fluss log from the beginning can restore state outside the Flink
checkpoint boundary, for example on a fresh start or when the table contains
retained data from a previous run. It also hides missing-marker issues and
makes recovery cost depend on the full retained log.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ * <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ * <li><b>Write</b>: Append to log + update in-memory HashMap.
+ * <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if
none) to rebuild
+ * in-memory state.
+ * <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a
log/append-only table):
+ *
+ * <pre>
+ * state_key STRING -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES -- ActionState JSON serialized bytes
+ * agent_key STRING -- Original key; used as bucket key so all records
for the same
+ * agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlussActionStateStore.class);
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ /**
+ * Number of consecutive empty polls before considering the log fully
consumed during recovery.
+ */
+ private static final int EMPTY_POLL_THRESHOLD = 3;
+
+ private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+ /** Creates and configures the ObjectMapper for ActionState serialization.
*/
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+ return mapper;
+ }
+
+ /** Mixin to add type information for Event hierarchy. */
+ @JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "@class")
+ abstract static class EventTypeInfoMixin {}
+
+ // Column indices in the Fluss table schema
+ private static final int COL_STATE_KEY = 0;
+ private static final int COL_STATE_PAYLOAD = 1;
+ private static final int COL_AGENT_KEY = 2;
+ private static final int NUM_COLUMNS = 3;
+
+ private final AgentConfiguration agentConfiguration;
+ private final String databaseName;
+ private final String tableName;
+ private final TablePath tablePath;
+
+ private final Connection connection;
+ private final Table table;
+ private final AppendWriter writer;
+
+ /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on
recovery. */
+ private final Map<String, ActionState> actionStates;
+
+ public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+ this.agentConfiguration = agentConfiguration;
+ this.databaseName =
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+ this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+ this.tablePath = TablePath.of(databaseName, tableName);
+ this.actionStates = new HashMap<>();
+
+ Configuration flussConf = new Configuration();
+ flussConf.setString("bootstrap.servers",
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+ String securityProtocol =
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+ if (securityProtocol != null) {
+ flussConf.setString("client.security.protocol", securityProtocol);
+ }
+ String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+ if (saslMechanism != null) {
+ flussConf.setString("client.security.sasl.mechanism",
saslMechanism);
+ }
+ String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+ if (jaasConfig != null) {
+ flussConf.setString("client.security.sasl.jaas.config",
jaasConfig);
+ }
+ String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+ if (username != null) {
+ flussConf.setString("client.security.sasl.username", username);
+ }
+ String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+ if (password != null) {
+ flussConf.setString("client.security.sasl.password", password);
+ }
+
+ this.connection = ConnectionFactory.createConnection(flussConf);
+ maybeCreateDatabaseAndTable();
+ this.table = connection.getTable(tablePath);
+ this.writer = table.newAppend().createWriter();
+
+ LOG.info(
+ "Initialized FlussActionStateStore (log table) with table:
{}.{}",
+ databaseName,
+ tableName);
+ }
+
+ @Override
+ public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
+ throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+ GenericRow row = new GenericRow(NUM_COLUMNS);
Review Comment:
Nit: this could be simplified with `GenericRow.of(...)`. The row has only
three fixed fields, so constructing it in one expression may be easier to read
than allocating by column count and setting each field separately.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ * <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ * <li><b>Write</b>: Append to log + update in-memory HashMap.
+ * <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if
none) to rebuild
+ * in-memory state.
+ * <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a
log/append-only table):
+ *
+ * <pre>
+ * state_key STRING -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES -- ActionState JSON serialized bytes
+ * agent_key STRING -- Original key; used as bucket key so all records
for the same
+ * agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlussActionStateStore.class);
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ /**
+ * Number of consecutive empty polls before considering the log fully
consumed during recovery.
+ */
+ private static final int EMPTY_POLL_THRESHOLD = 3;
+
+ private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+ /** Creates and configures the ObjectMapper for ActionState serialization.
*/
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+ return mapper;
+ }
+
+ /** Mixin to add type information for Event hierarchy. */
+ @JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "@class")
+ abstract static class EventTypeInfoMixin {}
+
+ // Column indices in the Fluss table schema
+ private static final int COL_STATE_KEY = 0;
+ private static final int COL_STATE_PAYLOAD = 1;
+ private static final int COL_AGENT_KEY = 2;
+ private static final int NUM_COLUMNS = 3;
+
+ private final AgentConfiguration agentConfiguration;
+ private final String databaseName;
+ private final String tableName;
+ private final TablePath tablePath;
+
+ private final Connection connection;
+ private final Table table;
+ private final AppendWriter writer;
+
+ /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on
recovery. */
+ private final Map<String, ActionState> actionStates;
+
+ public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+ this.agentConfiguration = agentConfiguration;
+ this.databaseName =
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+ this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+ this.tablePath = TablePath.of(databaseName, tableName);
+ this.actionStates = new HashMap<>();
+
+ Configuration flussConf = new Configuration();
+ flussConf.setString("bootstrap.servers",
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+ String securityProtocol =
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+ if (securityProtocol != null) {
+ flussConf.setString("client.security.protocol", securityProtocol);
+ }
+ String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+ if (saslMechanism != null) {
+ flussConf.setString("client.security.sasl.mechanism",
saslMechanism);
+ }
+ String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+ if (jaasConfig != null) {
+ flussConf.setString("client.security.sasl.jaas.config",
jaasConfig);
+ }
+ String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+ if (username != null) {
+ flussConf.setString("client.security.sasl.username", username);
+ }
+ String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+ if (password != null) {
+ flussConf.setString("client.security.sasl.password", password);
+ }
+
+ this.connection = ConnectionFactory.createConnection(flussConf);
+ maybeCreateDatabaseAndTable();
+ this.table = connection.getTable(tablePath);
+ this.writer = table.newAppend().createWriter();
+
+ LOG.info(
+ "Initialized FlussActionStateStore (log table) with table:
{}.{}",
+ databaseName,
+ tableName);
+ }
+
+ @Override
+ public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
+ throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+ GenericRow row = new GenericRow(NUM_COLUMNS);
+ row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+ row.setField(COL_STATE_PAYLOAD, payload);
+ row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+ // Append to Fluss log for durability, then update in-memory cache.
+ // Synchronous write ensures the record is durable before returning.
+ writer.append(row).get();
+ actionStates.put(stateKey, state);
+
+ LOG.debug("Stored action state: key={}, isCompleted={}", stateKey,
state.isCompleted());
+ }
+
+ @Override
+ public ActionState get(Object key, long seqNum, Action action, Event
event) throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ ActionState state = actionStates.get(stateKey);
+ LOG.debug("Lookup action state: key={}, found={}", stateKey, state !=
null);
+ return state;
+ }
+
+ /**
+ * Rebuilds in-memory state by scanning the Fluss log table. If recovery
markers are provided,
+ * computes the minimum offset per bucket across all markers and
subscribes from those offsets.
+ * Otherwise, subscribes from the beginning. Reads until the log is
exhausted ({@link
+ * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key
appearing multiple
+ * times in the log, the latest record wins (last-write-wins).
+ *
+ * <p>Note: Unlike the Kafka implementation which skips rebuild on empty
markers, this
+ * implementation scans from the beginning when no markers are provided,
enabling full state
+ * recovery even without prior checkpoints.
+ */
+ @Override
+ public void rebuildState(List<Object> recoveryMarkers) {
+ LOG.info(
+ "Rebuilding action state from Fluss log table with {} recovery
markers",
+ recoveryMarkers.size());
+ actionStates.clear();
+
+ int numBuckets = table.getTableInfo().getNumBuckets();
+
+ // Compute per-bucket start offsets from recovery markers
+ Map<Integer, Long> bucketOffsets = new HashMap<>();
+ for (Object marker : recoveryMarkers) {
+ if (marker instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+ for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+ bucketOffsets.merge(entry.getKey(), entry.getValue(),
Math::min);
+ }
+ } else if (marker != null) {
+ LOG.warn(
+ "Ignoring unrecognized recovery marker type: {}",
+ marker.getClass().getName());
+ }
+ }
+
+ try (LogScanner scanner = table.newScan().createLogScanner()) {
+ if (bucketOffsets.isEmpty()) {
+ for (int b = 0; b < numBuckets; b++) {
+ scanner.subscribeFromBeginning(b);
+ }
+ } else {
+ for (Map.Entry<Integer, Long> entry :
bucketOffsets.entrySet()) {
+ scanner.subscribe(entry.getKey(), entry.getValue());
+ }
+ }
+
+ int emptyPollCount = 0;
+ while (emptyPollCount < EMPTY_POLL_THRESHOLD) {
Review Comment:
Could we use explicit stopping offsets instead of consecutive empty polls to
terminate rebuild? AFAIK, rebuild can define a precise replay window: read from
the marker offset up to the **latest offset** captured at rebuild start.
`admin.listOffsets(tablePath, buckets, new
OffsetSpec.LatestSpec()).all().get()`
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ * <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ * <li><b>Write</b>: Append to log + update in-memory HashMap.
+ * <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if
none) to rebuild
+ * in-memory state.
+ * <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a
log/append-only table):
+ *
+ * <pre>
+ * state_key STRING -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES -- ActionState JSON serialized bytes
+ * agent_key STRING -- Original key; used as bucket key so all records
for the same
+ * agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlussActionStateStore.class);
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ /**
+ * Number of consecutive empty polls before considering the log fully
consumed during recovery.
+ */
+ private static final int EMPTY_POLL_THRESHOLD = 3;
+
+ private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+ /** Creates and configures the ObjectMapper for ActionState serialization.
*/
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+ return mapper;
+ }
+
+ /** Mixin to add type information for Event hierarchy. */
+ @JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "@class")
+ abstract static class EventTypeInfoMixin {}
+
+ // Column indices in the Fluss table schema
+ private static final int COL_STATE_KEY = 0;
+ private static final int COL_STATE_PAYLOAD = 1;
+ private static final int COL_AGENT_KEY = 2;
+ private static final int NUM_COLUMNS = 3;
+
+ private final AgentConfiguration agentConfiguration;
+ private final String databaseName;
+ private final String tableName;
+ private final TablePath tablePath;
+
+ private final Connection connection;
+ private final Table table;
+ private final AppendWriter writer;
+
+ /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on
recovery. */
+ private final Map<String, ActionState> actionStates;
+
+ public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+ this.agentConfiguration = agentConfiguration;
+ this.databaseName =
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+ this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+ this.tablePath = TablePath.of(databaseName, tableName);
+ this.actionStates = new HashMap<>();
+
+ Configuration flussConf = new Configuration();
+ flussConf.setString("bootstrap.servers",
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+ String securityProtocol =
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+ if (securityProtocol != null) {
+ flussConf.setString("client.security.protocol", securityProtocol);
+ }
+ String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+ if (saslMechanism != null) {
+ flussConf.setString("client.security.sasl.mechanism",
saslMechanism);
+ }
+ String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+ if (jaasConfig != null) {
+ flussConf.setString("client.security.sasl.jaas.config",
jaasConfig);
+ }
+ String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+ if (username != null) {
+ flussConf.setString("client.security.sasl.username", username);
+ }
+ String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+ if (password != null) {
+ flussConf.setString("client.security.sasl.password", password);
+ }
+
+ this.connection = ConnectionFactory.createConnection(flussConf);
+ maybeCreateDatabaseAndTable();
+ this.table = connection.getTable(tablePath);
+ this.writer = table.newAppend().createWriter();
+
+ LOG.info(
+ "Initialized FlussActionStateStore (log table) with table:
{}.{}",
+ databaseName,
+ tableName);
+ }
+
+ @Override
+ public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
+ throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+ GenericRow row = new GenericRow(NUM_COLUMNS);
+ row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+ row.setField(COL_STATE_PAYLOAD, payload);
+ row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+ // Append to Fluss log for durability, then update in-memory cache.
+ // Synchronous write ensures the record is durable before returning.
+ writer.append(row).get();
Review Comment:
For the current synchronous `put()` semantics, should we set
`WRITER_BATCH_TIMEOUT` to `0`? Since `put()` calls `writer.append(row).get()`,
each put waits for the append future to complete. With a non-zero batch
timeout, the batching delay becomes part of the user-visible put latency.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ * <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ * <li><b>Write</b>: Append to log + update in-memory HashMap.
+ * <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if
none) to rebuild
+ * in-memory state.
+ * <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a
log/append-only table):
+ *
+ * <pre>
+ * state_key STRING -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES -- ActionState JSON serialized bytes
+ * agent_key STRING -- Original key; used as bucket key so all records
for the same
+ * agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlussActionStateStore.class);
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ /**
+ * Number of consecutive empty polls before considering the log fully
consumed during recovery.
+ */
+ private static final int EMPTY_POLL_THRESHOLD = 3;
+
+ private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+ /** Creates and configures the ObjectMapper for ActionState serialization.
*/
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+ return mapper;
+ }
+
+ /** Mixin to add type information for Event hierarchy. */
+ @JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "@class")
+ abstract static class EventTypeInfoMixin {}
+
+ // Column indices in the Fluss table schema
+ private static final int COL_STATE_KEY = 0;
+ private static final int COL_STATE_PAYLOAD = 1;
+ private static final int COL_AGENT_KEY = 2;
+ private static final int NUM_COLUMNS = 3;
+
+ private final AgentConfiguration agentConfiguration;
+ private final String databaseName;
+ private final String tableName;
+ private final TablePath tablePath;
+
+ private final Connection connection;
+ private final Table table;
+ private final AppendWriter writer;
+
+ /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on
recovery. */
+ private final Map<String, ActionState> actionStates;
+
+ public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+ this.agentConfiguration = agentConfiguration;
+ this.databaseName =
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+ this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+ this.tablePath = TablePath.of(databaseName, tableName);
+ this.actionStates = new HashMap<>();
+
+ Configuration flussConf = new Configuration();
+ flussConf.setString("bootstrap.servers",
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+ String securityProtocol =
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+ if (securityProtocol != null) {
+ flussConf.setString("client.security.protocol", securityProtocol);
+ }
+ String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+ if (saslMechanism != null) {
+ flussConf.setString("client.security.sasl.mechanism",
saslMechanism);
+ }
+ String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+ if (jaasConfig != null) {
+ flussConf.setString("client.security.sasl.jaas.config",
jaasConfig);
+ }
+ String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+ if (username != null) {
+ flussConf.setString("client.security.sasl.username", username);
+ }
+ String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+ if (password != null) {
+ flussConf.setString("client.security.sasl.password", password);
+ }
+
+ this.connection = ConnectionFactory.createConnection(flussConf);
+ maybeCreateDatabaseAndTable();
+ this.table = connection.getTable(tablePath);
+ this.writer = table.newAppend().createWriter();
+
+ LOG.info(
+ "Initialized FlussActionStateStore (log table) with table:
{}.{}",
+ databaseName,
+ tableName);
+ }
+
+ @Override
+ public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
+ throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+ GenericRow row = new GenericRow(NUM_COLUMNS);
+ row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+ row.setField(COL_STATE_PAYLOAD, payload);
+ row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+ // Append to Fluss log for durability, then update in-memory cache.
+ // Synchronous write ensures the record is durable before returning.
+ writer.append(row).get();
+ actionStates.put(stateKey, state);
+
+ LOG.debug("Stored action state: key={}, isCompleted={}", stateKey,
state.isCompleted());
+ }
+
+ @Override
+ public ActionState get(Object key, long seqNum, Action action, Event
event) throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ ActionState state = actionStates.get(stateKey);
+ LOG.debug("Lookup action state: key={}, found={}", stateKey, state !=
null);
+ return state;
+ }
+
+ /**
+ * Rebuilds in-memory state by scanning the Fluss log table. If recovery
markers are provided,
+ * computes the minimum offset per bucket across all markers and
subscribes from those offsets.
+ * Otherwise, subscribes from the beginning. Reads until the log is
exhausted ({@link
+ * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key
appearing multiple
+ * times in the log, the latest record wins (last-write-wins).
+ *
+ * <p>Note: Unlike the Kafka implementation which skips rebuild on empty
markers, this
+ * implementation scans from the beginning when no markers are provided,
enabling full state
+ * recovery even without prior checkpoints.
+ */
+ @Override
+ public void rebuildState(List<Object> recoveryMarkers) {
+ LOG.info(
+ "Rebuilding action state from Fluss log table with {} recovery
markers",
+ recoveryMarkers.size());
+ actionStates.clear();
+
+ int numBuckets = table.getTableInfo().getNumBuckets();
+
+ // Compute per-bucket start offsets from recovery markers
+ Map<Integer, Long> bucketOffsets = new HashMap<>();
+ for (Object marker : recoveryMarkers) {
+ if (marker instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+ for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+ bucketOffsets.merge(entry.getKey(), entry.getValue(),
Math::min);
+ }
+ } else if (marker != null) {
+ LOG.warn(
+ "Ignoring unrecognized recovery marker type: {}",
+ marker.getClass().getName());
+ }
+ }
+
+ try (LogScanner scanner = table.newScan().createLogScanner()) {
+ if (bucketOffsets.isEmpty()) {
+ for (int b = 0; b < numBuckets; b++) {
+ scanner.subscribeFromBeginning(b);
+ }
+ } else {
+ for (Map.Entry<Integer, Long> entry :
bucketOffsets.entrySet()) {
+ scanner.subscribe(entry.getKey(), entry.getValue());
+ }
+ }
+
+ int emptyPollCount = 0;
+ while (emptyPollCount < EMPTY_POLL_THRESHOLD) {
+ ScanRecords records = scanner.poll(POLL_TIMEOUT);
+ if (records.isEmpty()) {
+ emptyPollCount++;
+ } else {
+ emptyPollCount = 0;
+ for (TableBucket bucket : records.buckets()) {
+ for (ScanRecord record : records.records(bucket)) {
+ InternalRow row = record.getRow();
+ String stateKey =
row.getString(COL_STATE_KEY).toString();
+ byte[] payload = row.getBytes(COL_STATE_PAYLOAD);
+ try {
+ ActionState state =
+ OBJECT_MAPPER.readValue(payload,
ActionState.class);
+ actionStates.put(stateKey, state);
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to deserialize action state
for key: {}, skipping",
+ stateKey,
+ e);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to rebuild state from Fluss log
table", e);
+ }
+
+ LOG.info("Completed rebuilding state, recovered {} states",
actionStates.size());
+ }
+
+ /**
+ * Returns the end offsets of each bucket as a recovery marker. Similar to
Kafka's
+ * implementation, this captures the current log position so that {@link
#rebuildState} can
+ * resume from these offsets instead of scanning from the beginning.
+ */
+ @Override
+ public Object getRecoveryMarker() {
+ int numBuckets = table.getTableInfo().getNumBuckets();
+
+ try (Admin admin = connection.getAdmin()) {
+ List<Integer> buckets = new ArrayList<>();
+ for (int b = 0; b < numBuckets; b++) {
+ buckets.add(b);
+ }
+ return new HashMap<>(
+ admin.listOffsets(tablePath, buckets, new
OffsetSpec.LatestSpec()).all().get());
+ } catch (Exception e) {
+ LOG.error("Failed to get end offsets for Fluss table: {}",
tablePath, e);
+ throw new RuntimeException("Failed to get end offsets for Fluss
table", e);
+ }
+ }
+
+ /**
+ * Evicts pruned states from the in-memory cache. The Fluss log is
append-only; physical cleanup
+ * relies on Fluss log retention configuration.
+ */
+ @Override
+ public void pruneState(Object key, long seqNum) {
+ LOG.debug("Pruning in-memory state for key: {} up to seqNum: {}", key,
seqNum);
+
+ actionStates
+ .entrySet()
+ .removeIf(
+ entry -> {
+ String stateKey = entry.getKey();
+ if (stateKey.startsWith(key.toString() + "_")) {
+ try {
+ List<String> parts =
ActionStateUtil.parseKey(stateKey);
+ if (parts.size() >= 2) {
+ long stateSeqNum =
Long.parseLong(parts.get(1));
+ return stateSeqNum <= seqNum;
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn("Failed to parse seqNum from
state key: {}", stateKey);
+ }
+ }
+ return false;
+ });
+
+ LOG.debug("Pruned in-memory state for key: {} up to seqNum: {}", key,
seqNum);
Review Comment:
Nit: the debug logs at the beginning and end of this method are almost
identical
##########
runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link FlussActionStateStore} serialization and in-memory
behavior. These tests
+ * verify the ObjectMapper configuration and ActionState serialization
round-trips without requiring
+ * a real Fluss cluster.
+ */
+public class FlussActionStateStoreTest {
Review Comment:
This test is currently closer to an ActionState serde test than a
`FlussActionStateStore` test which shoud focus on store-level behavior, such as
`put`, `get`, `rebuildState`, `pruneState` etc.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ * <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ * <li><b>Write</b>: Append to log + update in-memory HashMap.
+ * <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if
none) to rebuild
+ * in-memory state.
+ * <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a
log/append-only table):
+ *
+ * <pre>
+ * state_key STRING -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES -- ActionState JSON serialized bytes
+ * agent_key STRING -- Original key; used as bucket key so all records
for the same
+ * agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlussActionStateStore.class);
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ /**
+ * Number of consecutive empty polls before considering the log fully
consumed during recovery.
+ */
+ private static final int EMPTY_POLL_THRESHOLD = 3;
+
+ private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+ /** Creates and configures the ObjectMapper for ActionState serialization.
*/
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+ return mapper;
+ }
+
+ /** Mixin to add type information for Event hierarchy. */
+ @JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "@class")
+ abstract static class EventTypeInfoMixin {}
+
+ // Column indices in the Fluss table schema
+ private static final int COL_STATE_KEY = 0;
+ private static final int COL_STATE_PAYLOAD = 1;
+ private static final int COL_AGENT_KEY = 2;
+ private static final int NUM_COLUMNS = 3;
+
+ private final AgentConfiguration agentConfiguration;
+ private final String databaseName;
+ private final String tableName;
+ private final TablePath tablePath;
+
+ private final Connection connection;
+ private final Table table;
+ private final AppendWriter writer;
+
+ /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on
recovery. */
+ private final Map<String, ActionState> actionStates;
+
+ public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+ this.agentConfiguration = agentConfiguration;
+ this.databaseName =
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+ this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+ this.tablePath = TablePath.of(databaseName, tableName);
+ this.actionStates = new HashMap<>();
+
+ Configuration flussConf = new Configuration();
+ flussConf.setString("bootstrap.servers",
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+ String securityProtocol =
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+ if (securityProtocol != null) {
+ flussConf.setString("client.security.protocol", securityProtocol);
+ }
+ String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+ if (saslMechanism != null) {
+ flussConf.setString("client.security.sasl.mechanism",
saslMechanism);
+ }
+ String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+ if (jaasConfig != null) {
+ flussConf.setString("client.security.sasl.jaas.config",
jaasConfig);
+ }
+ String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+ if (username != null) {
+ flussConf.setString("client.security.sasl.username", username);
+ }
+ String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+ if (password != null) {
+ flussConf.setString("client.security.sasl.password", password);
+ }
+
+ this.connection = ConnectionFactory.createConnection(flussConf);
+ maybeCreateDatabaseAndTable();
+ this.table = connection.getTable(tablePath);
+ this.writer = table.newAppend().createWriter();
+
+ LOG.info(
+ "Initialized FlussActionStateStore (log table) with table:
{}.{}",
+ databaseName,
+ tableName);
+ }
+
+ @Override
+ public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
+ throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+ GenericRow row = new GenericRow(NUM_COLUMNS);
+ row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+ row.setField(COL_STATE_PAYLOAD, payload);
+ row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+ // Append to Fluss log for durability, then update in-memory cache.
+ // Synchronous write ensures the record is durable before returning.
+ writer.append(row).get();
+ actionStates.put(stateKey, state);
+
+ LOG.debug("Stored action state: key={}, isCompleted={}", stateKey,
state.isCompleted());
+ }
+
+ @Override
+ public ActionState get(Object key, long seqNum, Action action, Event
event) throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ ActionState state = actionStates.get(stateKey);
+ LOG.debug("Lookup action state: key={}, found={}", stateKey, state !=
null);
+ return state;
+ }
+
+ /**
+ * Rebuilds in-memory state by scanning the Fluss log table. If recovery
markers are provided,
+ * computes the minimum offset per bucket across all markers and
subscribes from those offsets.
+ * Otherwise, subscribes from the beginning. Reads until the log is
exhausted ({@link
+ * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key
appearing multiple
+ * times in the log, the latest record wins (last-write-wins).
+ *
+ * <p>Note: Unlike the Kafka implementation which skips rebuild on empty
markers, this
+ * implementation scans from the beginning when no markers are provided,
enabling full state
+ * recovery even without prior checkpoints.
+ */
+ @Override
+ public void rebuildState(List<Object> recoveryMarkers) {
+ LOG.info(
+ "Rebuilding action state from Fluss log table with {} recovery
markers",
+ recoveryMarkers.size());
+ actionStates.clear();
+
+ int numBuckets = table.getTableInfo().getNumBuckets();
+
+ // Compute per-bucket start offsets from recovery markers
+ Map<Integer, Long> bucketOffsets = new HashMap<>();
+ for (Object marker : recoveryMarkers) {
+ if (marker instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+ for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+ bucketOffsets.merge(entry.getKey(), entry.getValue(),
Math::min);
+ }
+ } else if (marker != null) {
+ LOG.warn(
+ "Ignoring unrecognized recovery marker type: {}",
+ marker.getClass().getName());
+ }
+ }
+
+ try (LogScanner scanner = table.newScan().createLogScanner()) {
+ if (bucketOffsets.isEmpty()) {
+ for (int b = 0; b < numBuckets; b++) {
+ scanner.subscribeFromBeginning(b);
+ }
+ } else {
+ for (Map.Entry<Integer, Long> entry :
bucketOffsets.entrySet()) {
+ scanner.subscribe(entry.getKey(), entry.getValue());
+ }
+ }
+
+ int emptyPollCount = 0;
+ while (emptyPollCount < EMPTY_POLL_THRESHOLD) {
+ ScanRecords records = scanner.poll(POLL_TIMEOUT);
+ if (records.isEmpty()) {
+ emptyPollCount++;
+ } else {
+ emptyPollCount = 0;
+ for (TableBucket bucket : records.buckets()) {
+ for (ScanRecord record : records.records(bucket)) {
+ InternalRow row = record.getRow();
+ String stateKey =
row.getString(COL_STATE_KEY).toString();
+ byte[] payload = row.getBytes(COL_STATE_PAYLOAD);
+ try {
+ ActionState state =
+ OBJECT_MAPPER.readValue(payload,
ActionState.class);
+ actionStates.put(stateKey, state);
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to deserialize action state
for key: {}, skipping",
+ stateKey,
+ e);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to rebuild state from Fluss log
table", e);
+ }
+
+ LOG.info("Completed rebuilding state, recovered {} states",
actionStates.size());
+ }
+
+ /**
+ * Returns the end offsets of each bucket as a recovery marker. Similar to
Kafka's
+ * implementation, this captures the current log position so that {@link
#rebuildState} can
+ * resume from these offsets instead of scanning from the beginning.
+ */
+ @Override
+ public Object getRecoveryMarker() {
+ int numBuckets = table.getTableInfo().getNumBuckets();
+
+ try (Admin admin = connection.getAdmin()) {
+ List<Integer> buckets = new ArrayList<>();
+ for (int b = 0; b < numBuckets; b++) {
+ buckets.add(b);
+ }
+ return new HashMap<>(
+ admin.listOffsets(tablePath, buckets, new
OffsetSpec.LatestSpec()).all().get());
+ } catch (Exception e) {
+ LOG.error("Failed to get end offsets for Fluss table: {}",
tablePath, e);
+ throw new RuntimeException("Failed to get end offsets for Fluss
table", e);
+ }
+ }
+
+ /**
+ * Evicts pruned states from the in-memory cache. The Fluss log is
append-only; physical cleanup
+ * relies on Fluss log retention configuration.
+ */
+ @Override
+ public void pruneState(Object key, long seqNum) {
+ LOG.debug("Pruning in-memory state for key: {} up to seqNum: {}", key,
seqNum);
+
+ actionStates
+ .entrySet()
+ .removeIf(
+ entry -> {
+ String stateKey = entry.getKey();
+ if (stateKey.startsWith(key.toString() + "_")) {
+ try {
+ List<String> parts =
ActionStateUtil.parseKey(stateKey);
+ if (parts.size() >= 2) {
+ long stateSeqNum =
Long.parseLong(parts.get(1));
+ return stateSeqNum <= seqNum;
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn("Failed to parse seqNum from
state key: {}", stateKey);
+ }
+ }
+ return false;
+ });
+
+ LOG.debug("Pruned in-memory state for key: {} up to seqNum: {}", key,
seqNum);
+ }
+
+ @Override
+ public void close() throws Exception {
+ Exception firstException = null;
Review Comment:
Could we simplify this close logic? Given that each `put()` waits for
`writer.append(row).get()`, I am not sure an extra `writer.flush()` is needed
here. The `firstException` handling also feels more complex than necessary.
Maybe we can just close the table and ensure the connection is closed in
`finally`, letting any close exception propagate normally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]