joeyutong commented on code in PR #628:
URL: https://github.com/apache/flink-agents/pull/628#discussion_r3151827392


##########
docs/content/docs/operations/configuration.md:
##########
@@ -140,14 +140,35 @@ Here is the list of all built-in core configuration 
options.
 
 ### Action State Store
 
+#### Common
+
+| Key                          | Default          | Type    | Description      
                                                                        |
+|------------------------------|------------------|---------|------------------------------------------------------------------------------------------|
+| `actionStateStoreBackend`    | (none)           | String  | The backend for 
action state store. Supported values: `"kafka"`, `"fluss"`.              |
+
 #### Kafka-based Action State Store
 
 Here are the configuration options for Kafka-based Action State Store.
 
 | Key                                 | Default                  | Type    | 
Description                                                                 |
 
|-------------------------------------|--------------------------|---------|-----------------------------------------------------------------------------|
-| `actionStateStoreBackend`           | (none)                   | String  | 
The config parameter specifies the backend for action state store.          |
 | `kafkaBootstrapServers`             | "localhost:9092"         | String  | 
The config parameter specifies the Kafka bootstrap server.                  |
 | `kafkaActionStateTopic`             | (none)                   | String  | 
The config parameter specifies the Kafka topic for action state.            |
 | `kafkaActionStateTopicNumPartitions`| 64                       | Integer | 
The config parameter specifies the number of partitions for the Kafka action 
state topic. |
 | `kafkaActionStateTopicReplicationFactor` | 1                     | Integer | 
The config parameter specifies the replication factor for the Kafka action 
state topic. |
+
+#### Fluss-based Action State Store
+
+Here are the configuration options for Fluss-based Action State Store.
+
+| Key                          | Default          | Type    | Description      
                                                                        |
+|------------------------------|------------------|---------|------------------------------------------------------------------------------------------|
+| `flussBootstrapServers`      | "localhost:9123" | String  | The Fluss 
bootstrap servers address.                                                     |
+| `flussActionStateDatabase`   | "flink_agents"   | String  | The Fluss 
database name for storing action state.                                        |
+| `flussActionStateTable`      | "action_state"   | String  | The Fluss table 
name for storing action state.                                           |

Review Comment:
   ```suggestion
   | `flussActionStateTable`      | "action_states"   | String  | The Fluss 
table name for storing action state.                                           |
   ```



##########
api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java:
##########
@@ -51,6 +51,44 @@ public class AgentConfigOptions {
     public static final ConfigOption<Integer> 
KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR =
             new ConfigOption<>("kafkaActionStateTopicReplicationFactor", 
Integer.class, 1);
 
+    /** The config parameter specifies the Fluss bootstrap servers. */
+    public static final ConfigOption<String> FLUSS_BOOTSTRAP_SERVERS =
+            new ConfigOption<>("flussBootstrapServers", String.class, 
"localhost:9123");
+
+    /** The config parameter specifies the Fluss database for action state. */
+    public static final ConfigOption<String> FLUSS_ACTION_STATE_DATABASE =
+            new ConfigOption<>("flussActionStateDatabase", String.class, 
"flink_agents");
+
+    /** The config parameter specifies the Fluss table name for action state. 
*/
+    public static final ConfigOption<String> FLUSS_ACTION_STATE_TABLE =
+            new ConfigOption<>("flussActionStateTable", String.class, 
"action_state");

Review Comment:
   ```suggestion
               new ConfigOption<>("flussActionStateTable", String.class, 
"action_states");
   ```



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the 
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ *   <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ *   <li><b>Write</b>: Append to log + update in-memory HashMap.
+ *   <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if 
none) to rebuild
+ *       in-memory state.
+ *   <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on 
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a 
log/append-only table):
+ *
+ * <pre>
+ * state_key     STRING  -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES   -- ActionState JSON serialized bytes
+ * agent_key     STRING  -- Original key; used as bucket key so all records 
for the same
+ *                          agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussActionStateStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+    /**
+     * Number of consecutive empty polls before considering the log fully 
consumed during recovery.
+     */
+    private static final int EMPTY_POLL_THRESHOLD = 3;
+
+    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+    /** Creates and configures the ObjectMapper for ActionState serialization. 
*/
+    private static ObjectMapper createObjectMapper() {
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+        return mapper;
+    }
+
+    /** Mixin to add type information for Event hierarchy. */
+    @JsonTypeInfo(
+            use = JsonTypeInfo.Id.CLASS,
+            include = JsonTypeInfo.As.PROPERTY,
+            property = "@class")
+    abstract static class EventTypeInfoMixin {}
+
+    // Column indices in the Fluss table schema
+    private static final int COL_STATE_KEY = 0;
+    private static final int COL_STATE_PAYLOAD = 1;
+    private static final int COL_AGENT_KEY = 2;
+    private static final int NUM_COLUMNS = 3;
+
+    private final AgentConfiguration agentConfiguration;
+    private final String databaseName;
+    private final String tableName;
+    private final TablePath tablePath;
+
+    private final Connection connection;
+    private final Table table;
+    private final AppendWriter writer;
+
+    /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on 
recovery. */
+    private final Map<String, ActionState> actionStates;
+
+    public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+        this.agentConfiguration = agentConfiguration;
+        this.databaseName = 
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+        this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+        this.tablePath = TablePath.of(databaseName, tableName);
+        this.actionStates = new HashMap<>();
+
+        Configuration flussConf = new Configuration();
+        flussConf.setString("bootstrap.servers", 
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+        String securityProtocol = 
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+        if (securityProtocol != null) {
+            flussConf.setString("client.security.protocol", securityProtocol);
+        }
+        String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+        if (saslMechanism != null) {
+            flussConf.setString("client.security.sasl.mechanism", 
saslMechanism);
+        }
+        String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+        if (jaasConfig != null) {
+            flussConf.setString("client.security.sasl.jaas.config", 
jaasConfig);
+        }
+        String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+        if (username != null) {
+            flussConf.setString("client.security.sasl.username", username);
+        }
+        String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+        if (password != null) {
+            flussConf.setString("client.security.sasl.password", password);
+        }
+
+        this.connection = ConnectionFactory.createConnection(flussConf);
+        maybeCreateDatabaseAndTable();
+        this.table = connection.getTable(tablePath);
+        this.writer = table.newAppend().createWriter();
+
+        LOG.info(
+                "Initialized FlussActionStateStore (log table) with table: 
{}.{}",
+                databaseName,
+                tableName);
+    }
+
+    @Override
+    public void put(Object key, long seqNum, Action action, Event event, 
ActionState state)
+            throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+        GenericRow row = new GenericRow(NUM_COLUMNS);
+        row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+        row.setField(COL_STATE_PAYLOAD, payload);
+        row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+        // Append to Fluss log for durability, then update in-memory cache.
+        // Synchronous write ensures the record is durable before returning.
+        writer.append(row).get();
+        actionStates.put(stateKey, state);
+
+        LOG.debug("Stored action state: key={}, isCompleted={}", stateKey, 
state.isCompleted());
+    }
+
+    @Override
+    public ActionState get(Object key, long seqNum, Action action, Event 
event) throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        ActionState state = actionStates.get(stateKey);
+        LOG.debug("Lookup action state: key={}, found={}", stateKey, state != 
null);
+        return state;
+    }
+
+    /**
+     * Rebuilds in-memory state by scanning the Fluss log table. If recovery 
markers are provided,
+     * computes the minimum offset per bucket across all markers and 
subscribes from those offsets.
+     * Otherwise, subscribes from the beginning. Reads until the log is 
exhausted ({@link
+     * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key 
appearing multiple
+     * times in the log, the latest record wins (last-write-wins).
+     *
+     * <p>Note: Unlike the Kafka implementation which skips rebuild on empty 
markers, this
+     * implementation scans from the beginning when no markers are provided, 
enabling full state
+     * recovery even without prior checkpoints.
+     */
+    @Override
+    public void rebuildState(List<Object> recoveryMarkers) {
+        LOG.info(
+                "Rebuilding action state from Fluss log table with {} recovery 
markers",
+                recoveryMarkers.size());
+        actionStates.clear();
+
+        int numBuckets = table.getTableInfo().getNumBuckets();
+
+        // Compute per-bucket start offsets from recovery markers
+        Map<Integer, Long> bucketOffsets = new HashMap<>();
+        for (Object marker : recoveryMarkers) {
+            if (marker instanceof Map) {
+                @SuppressWarnings("unchecked")
+                Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+                for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+                    bucketOffsets.merge(entry.getKey(), entry.getValue(), 
Math::min);
+                }
+            } else if (marker != null) {
+                LOG.warn(
+                        "Ignoring unrecognized recovery marker type: {}",
+                        marker.getClass().getName());
+            }
+        }
+
+        try (LogScanner scanner = table.newScan().createLogScanner()) {
+            if (bucketOffsets.isEmpty()) {
+                for (int b = 0; b < numBuckets; b++) {
+                    scanner.subscribeFromBeginning(b);
+                }
+            } else {
+                for (Map.Entry<Integer, Long> entry : 
bucketOffsets.entrySet()) {
+                    scanner.subscribe(entry.getKey(), entry.getValue());
+                }
+            }
+
+            int emptyPollCount = 0;
+            while (emptyPollCount < EMPTY_POLL_THRESHOLD) {
+                ScanRecords records = scanner.poll(POLL_TIMEOUT);
+                if (records.isEmpty()) {
+                    emptyPollCount++;
+                } else {
+                    emptyPollCount = 0;
+                    for (TableBucket bucket : records.buckets()) {
+                        for (ScanRecord record : records.records(bucket)) {
+                            InternalRow row = record.getRow();
+                            String stateKey = 
row.getString(COL_STATE_KEY).toString();
+                            byte[] payload = row.getBytes(COL_STATE_PAYLOAD);
+                            try {
+                                ActionState state =
+                                        OBJECT_MAPPER.readValue(payload, 
ActionState.class);
+                                actionStates.put(stateKey, state);
+                            } catch (Exception e) {
+                                LOG.warn(
+                                        "Failed to deserialize action state 
for key: {}, skipping",
+                                        stateKey,
+                                        e);
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to rebuild state from Fluss log 
table", e);
+        }
+
+        LOG.info("Completed rebuilding state, recovered {} states", 
actionStates.size());
+    }
+
+    /**
+     * Returns the end offsets of each bucket as a recovery marker. Similar to 
Kafka's
+     * implementation, this captures the current log position so that {@link 
#rebuildState} can
+     * resume from these offsets instead of scanning from the beginning.
+     */
+    @Override
+    public Object getRecoveryMarker() {
+        int numBuckets = table.getTableInfo().getNumBuckets();
+
+        try (Admin admin = connection.getAdmin()) {
+            List<Integer> buckets = new ArrayList<>();
+            for (int b = 0; b < numBuckets; b++) {
+                buckets.add(b);
+            }
+            return new HashMap<>(
+                    admin.listOffsets(tablePath, buckets, new 
OffsetSpec.LatestSpec()).all().get());
+        } catch (Exception e) {
+            LOG.error("Failed to get end offsets for Fluss table: {}", 
tablePath, e);
+            throw new RuntimeException("Failed to get end offsets for Fluss 
table", e);
+        }
+    }
+
+    /**
+     * Evicts pruned states from the in-memory cache. The Fluss log is 
append-only; physical cleanup
+     * relies on Fluss log retention configuration.
+     */
+    @Override
+    public void pruneState(Object key, long seqNum) {
+        LOG.debug("Pruning in-memory state for key: {} up to seqNum: {}", key, 
seqNum);
+
+        actionStates
+                .entrySet()
+                .removeIf(
+                        entry -> {
+                            String stateKey = entry.getKey();
+                            if (stateKey.startsWith(key.toString() + "_")) {
+                                try {
+                                    List<String> parts = 
ActionStateUtil.parseKey(stateKey);
+                                    if (parts.size() >= 2) {
+                                        long stateSeqNum = 
Long.parseLong(parts.get(1));
+                                        return stateSeqNum <= seqNum;
+                                    }
+                                } catch (NumberFormatException e) {
+                                    LOG.warn("Failed to parse seqNum from 
state key: {}", stateKey);
+                                }
+                            }
+                            return false;
+                        });
+
+        LOG.debug("Pruned in-memory state for key: {} up to seqNum: {}", key, 
seqNum);
+    }
+
+    @Override
+    public void close() throws Exception {
+        Exception firstException = null;
+
+        try {
+            if (writer != null) {
+                writer.flush();
+            }
+        } catch (Exception e) {
+            firstException = e;
+        }
+
+        try {
+            if (table != null) {
+                table.close();
+            }
+        } catch (Exception e) {
+            if (firstException == null) {
+                firstException = e;
+            }
+        }
+
+        try {
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (Exception e) {
+            if (firstException == null) {
+                firstException = e;
+            }
+        }
+
+        if (firstException != null) {
+            throw firstException;
+        }
+    }
+
+    private void maybeCreateDatabaseAndTable() {
+        try (Admin admin = connection.getAdmin()) {
+            if (!admin.databaseExists(databaseName).get()) {
+                admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, 
true).get();
+                LOG.info("Created Fluss database: {}", databaseName);
+            }
+
+            if (!admin.tableExists(tablePath).get()) {
+                // No primaryKey() call — this creates an append-only log 
table in Fluss.
+                Schema schema =
+                        Schema.newBuilder()

Review Comment:
   Could we also define constants for the column names?



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the 
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ *   <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ *   <li><b>Write</b>: Append to log + update in-memory HashMap.
+ *   <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if 
none) to rebuild
+ *       in-memory state.
+ *   <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on 
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a 
log/append-only table):
+ *
+ * <pre>
+ * state_key     STRING  -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES   -- ActionState JSON serialized bytes
+ * agent_key     STRING  -- Original key; used as bucket key so all records 
for the same
+ *                          agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussActionStateStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+    /**
+     * Number of consecutive empty polls before considering the log fully 
consumed during recovery.
+     */
+    private static final int EMPTY_POLL_THRESHOLD = 3;
+
+    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+    /** Creates and configures the ObjectMapper for ActionState serialization. 
*/
+    private static ObjectMapper createObjectMapper() {
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+        return mapper;
+    }
+
+    /** Mixin to add type information for Event hierarchy. */
+    @JsonTypeInfo(
+            use = JsonTypeInfo.Id.CLASS,
+            include = JsonTypeInfo.As.PROPERTY,
+            property = "@class")
+    abstract static class EventTypeInfoMixin {}
+
+    // Column indices in the Fluss table schema
+    private static final int COL_STATE_KEY = 0;
+    private static final int COL_STATE_PAYLOAD = 1;
+    private static final int COL_AGENT_KEY = 2;
+    private static final int NUM_COLUMNS = 3;
+
+    private final AgentConfiguration agentConfiguration;
+    private final String databaseName;
+    private final String tableName;
+    private final TablePath tablePath;
+
+    private final Connection connection;
+    private final Table table;
+    private final AppendWriter writer;
+
+    /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on 
recovery. */
+    private final Map<String, ActionState> actionStates;
+
+    public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+        this.agentConfiguration = agentConfiguration;
+        this.databaseName = 
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+        this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+        this.tablePath = TablePath.of(databaseName, tableName);
+        this.actionStates = new HashMap<>();
+
+        Configuration flussConf = new Configuration();
+        flussConf.setString("bootstrap.servers", 
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+        String securityProtocol = 
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+        if (securityProtocol != null) {
+            flussConf.setString("client.security.protocol", securityProtocol);
+        }
+        String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+        if (saslMechanism != null) {
+            flussConf.setString("client.security.sasl.mechanism", 
saslMechanism);
+        }
+        String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+        if (jaasConfig != null) {
+            flussConf.setString("client.security.sasl.jaas.config", 
jaasConfig);
+        }
+        String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+        if (username != null) {
+            flussConf.setString("client.security.sasl.username", username);
+        }
+        String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+        if (password != null) {
+            flussConf.setString("client.security.sasl.password", password);
+        }
+
+        this.connection = ConnectionFactory.createConnection(flussConf);
+        maybeCreateDatabaseAndTable();
+        this.table = connection.getTable(tablePath);
+        this.writer = table.newAppend().createWriter();
+
+        LOG.info(
+                "Initialized FlussActionStateStore (log table) with table: 
{}.{}",
+                databaseName,
+                tableName);
+    }
+
+    @Override
+    public void put(Object key, long seqNum, Action action, Event event, 
ActionState state)
+            throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+        GenericRow row = new GenericRow(NUM_COLUMNS);
+        row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+        row.setField(COL_STATE_PAYLOAD, payload);
+        row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+        // Append to Fluss log for durability, then update in-memory cache.
+        // Synchronous write ensures the record is durable before returning.
+        writer.append(row).get();
+        actionStates.put(stateKey, state);
+
+        LOG.debug("Stored action state: key={}, isCompleted={}", stateKey, 
state.isCompleted());
+    }
+
+    @Override
+    public ActionState get(Object key, long seqNum, Action action, Event 
event) throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        ActionState state = actionStates.get(stateKey);
+        LOG.debug("Lookup action state: key={}, found={}", stateKey, state != 
null);
+        return state;
+    }
+
+    /**
+     * Rebuilds in-memory state by scanning the Fluss log table. If recovery 
markers are provided,
+     * computes the minimum offset per bucket across all markers and 
subscribes from those offsets.
+     * Otherwise, subscribes from the beginning. Reads until the log is 
exhausted ({@link
+     * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key 
appearing multiple
+     * times in the log, the latest record wins (last-write-wins).
+     *
+     * <p>Note: Unlike the Kafka implementation which skips rebuild on empty 
markers, this
+     * implementation scans from the beginning when no markers are provided, 
enabling full state
+     * recovery even without prior checkpoints.
+     */
+    @Override
+    public void rebuildState(List<Object> recoveryMarkers) {
+        LOG.info(
+                "Rebuilding action state from Fluss log table with {} recovery 
markers",
+                recoveryMarkers.size());
+        actionStates.clear();
+
+        int numBuckets = table.getTableInfo().getNumBuckets();
+
+        // Compute per-bucket start offsets from recovery markers
+        Map<Integer, Long> bucketOffsets = new HashMap<>();
+        for (Object marker : recoveryMarkers) {
+            if (marker instanceof Map) {
+                @SuppressWarnings("unchecked")
+                Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+                for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+                    bucketOffsets.merge(entry.getKey(), entry.getValue(), 
Math::min);
+                }
+            } else if (marker != null) {
+                LOG.warn(
+                        "Ignoring unrecognized recovery marker type: {}",
+                        marker.getClass().getName());
+            }
+        }
+
+        try (LogScanner scanner = table.newScan().createLogScanner()) {
+            if (bucketOffsets.isEmpty()) {
+                for (int b = 0; b < numBuckets; b++) {
+                    scanner.subscribeFromBeginning(b);
+                }
+            } else {
+                for (Map.Entry<Integer, Long> entry : 
bucketOffsets.entrySet()) {
+                    scanner.subscribe(entry.getKey(), entry.getValue());
+                }
+            }
+
+            int emptyPollCount = 0;
+            while (emptyPollCount < EMPTY_POLL_THRESHOLD) {
+                ScanRecords records = scanner.poll(POLL_TIMEOUT);
+                if (records.isEmpty()) {
+                    emptyPollCount++;
+                } else {
+                    emptyPollCount = 0;
+                    for (TableBucket bucket : records.buckets()) {
+                        for (ScanRecord record : records.records(bucket)) {
+                            InternalRow row = record.getRow();
+                            String stateKey = 
row.getString(COL_STATE_KEY).toString();
+                            byte[] payload = row.getBytes(COL_STATE_PAYLOAD);
+                            try {
+                                ActionState state =
+                                        OBJECT_MAPPER.readValue(payload, 
ActionState.class);
+                                actionStates.put(stateKey, state);
+                            } catch (Exception e) {
+                                LOG.warn(
+                                        "Failed to deserialize action state 
for key: {}, skipping",
+                                        stateKey,
+                                        e);
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to rebuild state from Fluss log 
table", e);
+        }
+
+        LOG.info("Completed rebuilding state, recovered {} states", 
actionStates.size());
+    }
+
+    /**
+     * Returns the end offsets of each bucket as a recovery marker. Similar to 
Kafka's
+     * implementation, this captures the current log position so that {@link 
#rebuildState} can
+     * resume from these offsets instead of scanning from the beginning.
+     */
+    @Override
+    public Object getRecoveryMarker() {
+        int numBuckets = table.getTableInfo().getNumBuckets();
+
+        try (Admin admin = connection.getAdmin()) {
+            List<Integer> buckets = new ArrayList<>();
+            for (int b = 0; b < numBuckets; b++) {
+                buckets.add(b);
+            }
+            return new HashMap<>(

Review Comment:
   Nit: do we need the extra `new HashMap<>(...)` here? Since the method 
returns `Object`, the map from `listOffsets(...).all().get()` can be returned 
directly unless a defensive copy is intentional.
   



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.

Review Comment:
   Could we simplify this class-level comment to focus on the current contract 
instead of the implementation history? The `HashMap` sentence and the “Compared 
to the previous KV-table implementation” section feel a bit redundant. Also, 
`agent_key` is the original input key, so “same agent” should probably be “same 
input key”.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the 
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ *   <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ *   <li><b>Write</b>: Append to log + update in-memory HashMap.
+ *   <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if 
none) to rebuild
+ *       in-memory state.
+ *   <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on 
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a 
log/append-only table):
+ *
+ * <pre>
+ * state_key     STRING  -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES   -- ActionState JSON serialized bytes
+ * agent_key     STRING  -- Original key; used as bucket key so all records 
for the same
+ *                          agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussActionStateStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();

Review Comment:
   Could we reuse the existing `ActionStateKafkaSeder` serde logic here?
   



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the 
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ *   <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ *   <li><b>Write</b>: Append to log + update in-memory HashMap.
+ *   <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if 
none) to rebuild
+ *       in-memory state.
+ *   <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on 
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a 
log/append-only table):
+ *
+ * <pre>
+ * state_key     STRING  -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES   -- ActionState JSON serialized bytes
+ * agent_key     STRING  -- Original key; used as bucket key so all records 
for the same
+ *                          agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussActionStateStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+    /**
+     * Number of consecutive empty polls before considering the log fully 
consumed during recovery.
+     */
+    private static final int EMPTY_POLL_THRESHOLD = 3;
+
+    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+    /** Creates and configures the ObjectMapper for ActionState serialization. 
*/
+    private static ObjectMapper createObjectMapper() {
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+        return mapper;
+    }
+
+    /** Mixin to add type information for Event hierarchy. */
+    @JsonTypeInfo(
+            use = JsonTypeInfo.Id.CLASS,
+            include = JsonTypeInfo.As.PROPERTY,
+            property = "@class")
+    abstract static class EventTypeInfoMixin {}
+
+    // Column indices in the Fluss table schema
+    private static final int COL_STATE_KEY = 0;
+    private static final int COL_STATE_PAYLOAD = 1;
+    private static final int COL_AGENT_KEY = 2;
+    private static final int NUM_COLUMNS = 3;
+
+    private final AgentConfiguration agentConfiguration;
+    private final String databaseName;
+    private final String tableName;
+    private final TablePath tablePath;
+
+    private final Connection connection;
+    private final Table table;
+    private final AppendWriter writer;
+
+    /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on 
recovery. */
+    private final Map<String, ActionState> actionStates;
+
+    public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+        this.agentConfiguration = agentConfiguration;
+        this.databaseName = 
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+        this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+        this.tablePath = TablePath.of(databaseName, tableName);
+        this.actionStates = new HashMap<>();
+
+        Configuration flussConf = new Configuration();
+        flussConf.setString("bootstrap.servers", 
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+        String securityProtocol = 
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+        if (securityProtocol != null) {
+            flussConf.setString("client.security.protocol", securityProtocol);
+        }
+        String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+        if (saslMechanism != null) {
+            flussConf.setString("client.security.sasl.mechanism", 
saslMechanism);
+        }
+        String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+        if (jaasConfig != null) {
+            flussConf.setString("client.security.sasl.jaas.config", 
jaasConfig);
+        }
+        String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+        if (username != null) {
+            flussConf.setString("client.security.sasl.username", username);
+        }
+        String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+        if (password != null) {
+            flussConf.setString("client.security.sasl.password", password);
+        }
+
+        this.connection = ConnectionFactory.createConnection(flussConf);
+        maybeCreateDatabaseAndTable();
+        this.table = connection.getTable(tablePath);
+        this.writer = table.newAppend().createWriter();
+
+        LOG.info(
+                "Initialized FlussActionStateStore (log table) with table: 
{}.{}",
+                databaseName,
+                tableName);
+    }
+
+    @Override
+    public void put(Object key, long seqNum, Action action, Event event, 
ActionState state)
+            throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+        GenericRow row = new GenericRow(NUM_COLUMNS);
+        row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+        row.setField(COL_STATE_PAYLOAD, payload);
+        row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+        // Append to Fluss log for durability, then update in-memory cache.
+        // Synchronous write ensures the record is durable before returning.
+        writer.append(row).get();
+        actionStates.put(stateKey, state);
+
+        LOG.debug("Stored action state: key={}, isCompleted={}", stateKey, 
state.isCompleted());
+    }
+
+    @Override
+    public ActionState get(Object key, long seqNum, Action action, Event 
event) throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);

Review Comment:
   Could we also handle divergence in `get()`? `KafkaActionStateStore#get()` 
calls `checkDivergence(key, seqNum)` and removes cached states beyond the 
requested sequence when divergence is detected. The Fluss backend currently 
only does a direct map lookup, which makes the two backends behave differently 
during divergent replay.
   



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the 
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ *   <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ *   <li><b>Write</b>: Append to log + update in-memory HashMap.
+ *   <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if 
none) to rebuild
+ *       in-memory state.
+ *   <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on 
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a 
log/append-only table):
+ *
+ * <pre>
+ * state_key     STRING  -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES   -- ActionState JSON serialized bytes
+ * agent_key     STRING  -- Original key; used as bucket key so all records 
for the same
+ *                          agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussActionStateStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+    /**
+     * Number of consecutive empty polls before considering the log fully 
consumed during recovery.
+     */
+    private static final int EMPTY_POLL_THRESHOLD = 3;
+
+    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+    /** Creates and configures the ObjectMapper for ActionState serialization. 
*/
+    private static ObjectMapper createObjectMapper() {
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+        return mapper;
+    }
+
+    /** Mixin to add type information for Event hierarchy. */
+    @JsonTypeInfo(
+            use = JsonTypeInfo.Id.CLASS,
+            include = JsonTypeInfo.As.PROPERTY,
+            property = "@class")
+    abstract static class EventTypeInfoMixin {}
+
+    // Column indices in the Fluss table schema
+    private static final int COL_STATE_KEY = 0;
+    private static final int COL_STATE_PAYLOAD = 1;
+    private static final int COL_AGENT_KEY = 2;
+    private static final int NUM_COLUMNS = 3;
+
+    private final AgentConfiguration agentConfiguration;
+    private final String databaseName;
+    private final String tableName;
+    private final TablePath tablePath;
+
+    private final Connection connection;
+    private final Table table;
+    private final AppendWriter writer;
+
+    /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on 
recovery. */
+    private final Map<String, ActionState> actionStates;
+
+    public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+        this.agentConfiguration = agentConfiguration;
+        this.databaseName = 
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+        this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+        this.tablePath = TablePath.of(databaseName, tableName);
+        this.actionStates = new HashMap<>();
+
+        Configuration flussConf = new Configuration();
+        flussConf.setString("bootstrap.servers", 
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+        String securityProtocol = 
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+        if (securityProtocol != null) {
+            flussConf.setString("client.security.protocol", securityProtocol);
+        }
+        String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+        if (saslMechanism != null) {
+            flussConf.setString("client.security.sasl.mechanism", 
saslMechanism);
+        }
+        String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+        if (jaasConfig != null) {
+            flussConf.setString("client.security.sasl.jaas.config", 
jaasConfig);
+        }
+        String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+        if (username != null) {
+            flussConf.setString("client.security.sasl.username", username);
+        }
+        String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+        if (password != null) {
+            flussConf.setString("client.security.sasl.password", password);
+        }
+
+        this.connection = ConnectionFactory.createConnection(flussConf);
+        maybeCreateDatabaseAndTable();
+        this.table = connection.getTable(tablePath);
+        this.writer = table.newAppend().createWriter();
+
+        LOG.info(
+                "Initialized FlussActionStateStore (log table) with table: 
{}.{}",
+                databaseName,
+                tableName);
+    }
+
+    @Override
+    public void put(Object key, long seqNum, Action action, Event event, 
ActionState state)
+            throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+        GenericRow row = new GenericRow(NUM_COLUMNS);
+        row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+        row.setField(COL_STATE_PAYLOAD, payload);
+        row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+        // Append to Fluss log for durability, then update in-memory cache.
+        // Synchronous write ensures the record is durable before returning.
+        writer.append(row).get();
+        actionStates.put(stateKey, state);
+
+        LOG.debug("Stored action state: key={}, isCompleted={}", stateKey, 
state.isCompleted());
+    }
+
+    @Override
+    public ActionState get(Object key, long seqNum, Action action, Event 
event) throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        ActionState state = actionStates.get(stateKey);
+        LOG.debug("Lookup action state: key={}, found={}", stateKey, state != 
null);
+        return state;
+    }
+
+    /**
+     * Rebuilds in-memory state by scanning the Fluss log table. If recovery 
markers are provided,
+     * computes the minimum offset per bucket across all markers and 
subscribes from those offsets.
+     * Otherwise, subscribes from the beginning. Reads until the log is 
exhausted ({@link
+     * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key 
appearing multiple
+     * times in the log, the latest record wins (last-write-wins).
+     *
+     * <p>Note: Unlike the Kafka implementation which skips rebuild on empty 
markers, this

Review Comment:
   Should we keep this behavior aligned with the Kafka backend? When 
`recoveryMarkers` or the derived bucket offsets are empty, Kafka skips rebuild 
because there is no checkpointed external-store position to recover from. 
Scanning the Fluss log from the beginning can restore state outside the Flink 
checkpoint boundary, for example on a fresh start or when the table contains 
retained data from a previous run. It also hides missing-marker issues and 
makes recovery cost depend on the full retained log.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the 
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ *   <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ *   <li><b>Write</b>: Append to log + update in-memory HashMap.
+ *   <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if 
none) to rebuild
+ *       in-memory state.
+ *   <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on 
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a 
log/append-only table):
+ *
+ * <pre>
+ * state_key     STRING  -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES   -- ActionState JSON serialized bytes
+ * agent_key     STRING  -- Original key; used as bucket key so all records 
for the same
+ *                          agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussActionStateStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+    /**
+     * Number of consecutive empty polls before considering the log fully 
consumed during recovery.
+     */
+    private static final int EMPTY_POLL_THRESHOLD = 3;
+
+    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+    /** Creates and configures the ObjectMapper for ActionState serialization. 
*/
+    private static ObjectMapper createObjectMapper() {
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+        return mapper;
+    }
+
+    /** Mixin to add type information for Event hierarchy. */
+    @JsonTypeInfo(
+            use = JsonTypeInfo.Id.CLASS,
+            include = JsonTypeInfo.As.PROPERTY,
+            property = "@class")
+    abstract static class EventTypeInfoMixin {}
+
+    // Column indices in the Fluss table schema
+    private static final int COL_STATE_KEY = 0;
+    private static final int COL_STATE_PAYLOAD = 1;
+    private static final int COL_AGENT_KEY = 2;
+    private static final int NUM_COLUMNS = 3;
+
+    private final AgentConfiguration agentConfiguration;
+    private final String databaseName;
+    private final String tableName;
+    private final TablePath tablePath;
+
+    private final Connection connection;
+    private final Table table;
+    private final AppendWriter writer;
+
+    /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on 
recovery. */
+    private final Map<String, ActionState> actionStates;
+
+    public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+        this.agentConfiguration = agentConfiguration;
+        this.databaseName = 
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+        this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+        this.tablePath = TablePath.of(databaseName, tableName);
+        this.actionStates = new HashMap<>();
+
+        Configuration flussConf = new Configuration();
+        flussConf.setString("bootstrap.servers", 
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+        String securityProtocol = 
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+        if (securityProtocol != null) {
+            flussConf.setString("client.security.protocol", securityProtocol);
+        }
+        String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+        if (saslMechanism != null) {
+            flussConf.setString("client.security.sasl.mechanism", 
saslMechanism);
+        }
+        String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+        if (jaasConfig != null) {
+            flussConf.setString("client.security.sasl.jaas.config", 
jaasConfig);
+        }
+        String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+        if (username != null) {
+            flussConf.setString("client.security.sasl.username", username);
+        }
+        String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+        if (password != null) {
+            flussConf.setString("client.security.sasl.password", password);
+        }
+
+        this.connection = ConnectionFactory.createConnection(flussConf);
+        maybeCreateDatabaseAndTable();
+        this.table = connection.getTable(tablePath);
+        this.writer = table.newAppend().createWriter();
+
+        LOG.info(
+                "Initialized FlussActionStateStore (log table) with table: 
{}.{}",
+                databaseName,
+                tableName);
+    }
+
+    @Override
+    public void put(Object key, long seqNum, Action action, Event event, 
ActionState state)
+            throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+        GenericRow row = new GenericRow(NUM_COLUMNS);

Review Comment:
   Nit: this could be simplified with `GenericRow.of(...)`. The row has only 
three fixed fields, so constructing it in one expression may be easier to read 
than allocating by column count and setting each field separately.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the 
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ *   <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ *   <li><b>Write</b>: Append to log + update in-memory HashMap.
+ *   <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if 
none) to rebuild
+ *       in-memory state.
+ *   <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on 
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a 
log/append-only table):
+ *
+ * <pre>
+ * state_key     STRING  -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES   -- ActionState JSON serialized bytes
+ * agent_key     STRING  -- Original key; used as bucket key so all records 
for the same
+ *                          agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussActionStateStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+    /**
+     * Number of consecutive empty polls before considering the log fully 
consumed during recovery.
+     */
+    private static final int EMPTY_POLL_THRESHOLD = 3;
+
+    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+    /** Creates and configures the ObjectMapper for ActionState serialization. 
*/
+    private static ObjectMapper createObjectMapper() {
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+        return mapper;
+    }
+
+    /** Mixin to add type information for Event hierarchy. */
+    @JsonTypeInfo(
+            use = JsonTypeInfo.Id.CLASS,
+            include = JsonTypeInfo.As.PROPERTY,
+            property = "@class")
+    abstract static class EventTypeInfoMixin {}
+
+    // Column indices in the Fluss table schema
+    private static final int COL_STATE_KEY = 0;
+    private static final int COL_STATE_PAYLOAD = 1;
+    private static final int COL_AGENT_KEY = 2;
+    private static final int NUM_COLUMNS = 3;
+
+    private final AgentConfiguration agentConfiguration;
+    private final String databaseName;
+    private final String tableName;
+    private final TablePath tablePath;
+
+    private final Connection connection;
+    private final Table table;
+    private final AppendWriter writer;
+
+    /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on 
recovery. */
+    private final Map<String, ActionState> actionStates;
+
+    public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+        this.agentConfiguration = agentConfiguration;
+        this.databaseName = 
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+        this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+        this.tablePath = TablePath.of(databaseName, tableName);
+        this.actionStates = new HashMap<>();
+
+        Configuration flussConf = new Configuration();
+        flussConf.setString("bootstrap.servers", 
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+        String securityProtocol = 
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+        if (securityProtocol != null) {
+            flussConf.setString("client.security.protocol", securityProtocol);
+        }
+        String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+        if (saslMechanism != null) {
+            flussConf.setString("client.security.sasl.mechanism", 
saslMechanism);
+        }
+        String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+        if (jaasConfig != null) {
+            flussConf.setString("client.security.sasl.jaas.config", 
jaasConfig);
+        }
+        String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+        if (username != null) {
+            flussConf.setString("client.security.sasl.username", username);
+        }
+        String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+        if (password != null) {
+            flussConf.setString("client.security.sasl.password", password);
+        }
+
+        this.connection = ConnectionFactory.createConnection(flussConf);
+        maybeCreateDatabaseAndTable();
+        this.table = connection.getTable(tablePath);
+        this.writer = table.newAppend().createWriter();
+
+        LOG.info(
+                "Initialized FlussActionStateStore (log table) with table: 
{}.{}",
+                databaseName,
+                tableName);
+    }
+
+    @Override
+    public void put(Object key, long seqNum, Action action, Event event, 
ActionState state)
+            throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+        GenericRow row = new GenericRow(NUM_COLUMNS);
+        row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+        row.setField(COL_STATE_PAYLOAD, payload);
+        row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+        // Append to Fluss log for durability, then update in-memory cache.
+        // Synchronous write ensures the record is durable before returning.
+        writer.append(row).get();
+        actionStates.put(stateKey, state);
+
+        LOG.debug("Stored action state: key={}, isCompleted={}", stateKey, 
state.isCompleted());
+    }
+
+    @Override
+    public ActionState get(Object key, long seqNum, Action action, Event 
event) throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        ActionState state = actionStates.get(stateKey);
+        LOG.debug("Lookup action state: key={}, found={}", stateKey, state != 
null);
+        return state;
+    }
+
+    /**
+     * Rebuilds in-memory state by scanning the Fluss log table. If recovery 
markers are provided,
+     * computes the minimum offset per bucket across all markers and 
subscribes from those offsets.
+     * Otherwise, subscribes from the beginning. Reads until the log is 
exhausted ({@link
+     * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key 
appearing multiple
+     * times in the log, the latest record wins (last-write-wins).
+     *
+     * <p>Note: Unlike the Kafka implementation which skips rebuild on empty 
markers, this
+     * implementation scans from the beginning when no markers are provided, 
enabling full state
+     * recovery even without prior checkpoints.
+     */
+    @Override
+    public void rebuildState(List<Object> recoveryMarkers) {
+        LOG.info(
+                "Rebuilding action state from Fluss log table with {} recovery 
markers",
+                recoveryMarkers.size());
+        actionStates.clear();
+
+        int numBuckets = table.getTableInfo().getNumBuckets();
+
+        // Compute per-bucket start offsets from recovery markers
+        Map<Integer, Long> bucketOffsets = new HashMap<>();
+        for (Object marker : recoveryMarkers) {
+            if (marker instanceof Map) {
+                @SuppressWarnings("unchecked")
+                Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+                for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+                    bucketOffsets.merge(entry.getKey(), entry.getValue(), 
Math::min);
+                }
+            } else if (marker != null) {
+                LOG.warn(
+                        "Ignoring unrecognized recovery marker type: {}",
+                        marker.getClass().getName());
+            }
+        }
+
+        try (LogScanner scanner = table.newScan().createLogScanner()) {
+            if (bucketOffsets.isEmpty()) {
+                for (int b = 0; b < numBuckets; b++) {
+                    scanner.subscribeFromBeginning(b);
+                }
+            } else {
+                for (Map.Entry<Integer, Long> entry : 
bucketOffsets.entrySet()) {
+                    scanner.subscribe(entry.getKey(), entry.getValue());
+                }
+            }
+
+            int emptyPollCount = 0;
+            while (emptyPollCount < EMPTY_POLL_THRESHOLD) {

Review Comment:
   Could we use explicit stopping offsets instead of consecutive empty polls to 
terminate rebuild? AFAIK, rebuild can define a precise replay window: read from 
the marker offset up to the **latest offset** captured at rebuild start. 
   `admin.listOffsets(tablePath, buckets, new 
OffsetSpec.LatestSpec()).all().get()`
   



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the 
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ *   <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ *   <li><b>Write</b>: Append to log + update in-memory HashMap.
+ *   <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if 
none) to rebuild
+ *       in-memory state.
+ *   <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on 
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a 
log/append-only table):
+ *
+ * <pre>
+ * state_key     STRING  -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES   -- ActionState JSON serialized bytes
+ * agent_key     STRING  -- Original key; used as bucket key so all records 
for the same
+ *                          agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussActionStateStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+    /**
+     * Number of consecutive empty polls before considering the log fully 
consumed during recovery.
+     */
+    private static final int EMPTY_POLL_THRESHOLD = 3;
+
+    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+    /** Creates and configures the ObjectMapper for ActionState serialization. 
*/
+    private static ObjectMapper createObjectMapper() {
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+        return mapper;
+    }
+
+    /** Mixin to add type information for Event hierarchy. */
+    @JsonTypeInfo(
+            use = JsonTypeInfo.Id.CLASS,
+            include = JsonTypeInfo.As.PROPERTY,
+            property = "@class")
+    abstract static class EventTypeInfoMixin {}
+
+    // Column indices in the Fluss table schema
+    private static final int COL_STATE_KEY = 0;
+    private static final int COL_STATE_PAYLOAD = 1;
+    private static final int COL_AGENT_KEY = 2;
+    private static final int NUM_COLUMNS = 3;
+
+    private final AgentConfiguration agentConfiguration;
+    private final String databaseName;
+    private final String tableName;
+    private final TablePath tablePath;
+
+    private final Connection connection;
+    private final Table table;
+    private final AppendWriter writer;
+
+    /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on 
recovery. */
+    private final Map<String, ActionState> actionStates;
+
+    public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+        this.agentConfiguration = agentConfiguration;
+        this.databaseName = 
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+        this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+        this.tablePath = TablePath.of(databaseName, tableName);
+        this.actionStates = new HashMap<>();
+
+        Configuration flussConf = new Configuration();
+        flussConf.setString("bootstrap.servers", 
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+        String securityProtocol = 
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+        if (securityProtocol != null) {
+            flussConf.setString("client.security.protocol", securityProtocol);
+        }
+        String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+        if (saslMechanism != null) {
+            flussConf.setString("client.security.sasl.mechanism", 
saslMechanism);
+        }
+        String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+        if (jaasConfig != null) {
+            flussConf.setString("client.security.sasl.jaas.config", 
jaasConfig);
+        }
+        String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+        if (username != null) {
+            flussConf.setString("client.security.sasl.username", username);
+        }
+        String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+        if (password != null) {
+            flussConf.setString("client.security.sasl.password", password);
+        }
+
+        this.connection = ConnectionFactory.createConnection(flussConf);
+        maybeCreateDatabaseAndTable();
+        this.table = connection.getTable(tablePath);
+        this.writer = table.newAppend().createWriter();
+
+        LOG.info(
+                "Initialized FlussActionStateStore (log table) with table: 
{}.{}",
+                databaseName,
+                tableName);
+    }
+
+    @Override
+    public void put(Object key, long seqNum, Action action, Event event, 
ActionState state)
+            throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+        GenericRow row = new GenericRow(NUM_COLUMNS);
+        row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+        row.setField(COL_STATE_PAYLOAD, payload);
+        row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+        // Append to Fluss log for durability, then update in-memory cache.
+        // Synchronous write ensures the record is durable before returning.
+        writer.append(row).get();

Review Comment:
   For the current synchronous `put()` semantics, should we set 
`WRITER_BATCH_TIMEOUT` to `0`? Since `put()` calls `writer.append(row).get()`, 
each put waits for the append future to complete. With a non-zero batch 
timeout, the batching delay becomes part of the user-visible put latency.
   



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the 
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ *   <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ *   <li><b>Write</b>: Append to log + update in-memory HashMap.
+ *   <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if 
none) to rebuild
+ *       in-memory state.
+ *   <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on 
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a 
log/append-only table):
+ *
+ * <pre>
+ * state_key     STRING  -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES   -- ActionState JSON serialized bytes
+ * agent_key     STRING  -- Original key; used as bucket key so all records 
for the same
+ *                          agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussActionStateStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+    /**
+     * Number of consecutive empty polls before considering the log fully 
consumed during recovery.
+     */
+    private static final int EMPTY_POLL_THRESHOLD = 3;
+
+    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+    /** Creates and configures the ObjectMapper for ActionState serialization. 
*/
+    private static ObjectMapper createObjectMapper() {
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+        return mapper;
+    }
+
+    /** Mixin to add type information for Event hierarchy. */
+    @JsonTypeInfo(
+            use = JsonTypeInfo.Id.CLASS,
+            include = JsonTypeInfo.As.PROPERTY,
+            property = "@class")
+    abstract static class EventTypeInfoMixin {}
+
+    // Column indices in the Fluss table schema
+    private static final int COL_STATE_KEY = 0;
+    private static final int COL_STATE_PAYLOAD = 1;
+    private static final int COL_AGENT_KEY = 2;
+    private static final int NUM_COLUMNS = 3;
+
+    private final AgentConfiguration agentConfiguration;
+    private final String databaseName;
+    private final String tableName;
+    private final TablePath tablePath;
+
+    private final Connection connection;
+    private final Table table;
+    private final AppendWriter writer;
+
+    /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on 
recovery. */
+    private final Map<String, ActionState> actionStates;
+
+    public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+        this.agentConfiguration = agentConfiguration;
+        this.databaseName = 
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+        this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+        this.tablePath = TablePath.of(databaseName, tableName);
+        this.actionStates = new HashMap<>();
+
+        Configuration flussConf = new Configuration();
+        flussConf.setString("bootstrap.servers", 
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+        String securityProtocol = 
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+        if (securityProtocol != null) {
+            flussConf.setString("client.security.protocol", securityProtocol);
+        }
+        String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+        if (saslMechanism != null) {
+            flussConf.setString("client.security.sasl.mechanism", 
saslMechanism);
+        }
+        String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+        if (jaasConfig != null) {
+            flussConf.setString("client.security.sasl.jaas.config", 
jaasConfig);
+        }
+        String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+        if (username != null) {
+            flussConf.setString("client.security.sasl.username", username);
+        }
+        String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+        if (password != null) {
+            flussConf.setString("client.security.sasl.password", password);
+        }
+
+        this.connection = ConnectionFactory.createConnection(flussConf);
+        maybeCreateDatabaseAndTable();
+        this.table = connection.getTable(tablePath);
+        this.writer = table.newAppend().createWriter();
+
+        LOG.info(
+                "Initialized FlussActionStateStore (log table) with table: 
{}.{}",
+                databaseName,
+                tableName);
+    }
+
+    @Override
+    public void put(Object key, long seqNum, Action action, Event event, 
ActionState state)
+            throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+        GenericRow row = new GenericRow(NUM_COLUMNS);
+        row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+        row.setField(COL_STATE_PAYLOAD, payload);
+        row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+        // Append to Fluss log for durability, then update in-memory cache.
+        // Synchronous write ensures the record is durable before returning.
+        writer.append(row).get();
+        actionStates.put(stateKey, state);
+
+        LOG.debug("Stored action state: key={}, isCompleted={}", stateKey, 
state.isCompleted());
+    }
+
+    @Override
+    public ActionState get(Object key, long seqNum, Action action, Event 
event) throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        ActionState state = actionStates.get(stateKey);
+        LOG.debug("Lookup action state: key={}, found={}", stateKey, state != 
null);
+        return state;
+    }
+
+    /**
+     * Rebuilds in-memory state by scanning the Fluss log table. If recovery 
markers are provided,
+     * computes the minimum offset per bucket across all markers and 
subscribes from those offsets.
+     * Otherwise, subscribes from the beginning. Reads until the log is 
exhausted ({@link
+     * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key 
appearing multiple
+     * times in the log, the latest record wins (last-write-wins).
+     *
+     * <p>Note: Unlike the Kafka implementation which skips rebuild on empty 
markers, this
+     * implementation scans from the beginning when no markers are provided, 
enabling full state
+     * recovery even without prior checkpoints.
+     */
+    @Override
+    public void rebuildState(List<Object> recoveryMarkers) {
+        LOG.info(
+                "Rebuilding action state from Fluss log table with {} recovery 
markers",
+                recoveryMarkers.size());
+        actionStates.clear();
+
+        int numBuckets = table.getTableInfo().getNumBuckets();
+
+        // Compute per-bucket start offsets from recovery markers
+        Map<Integer, Long> bucketOffsets = new HashMap<>();
+        for (Object marker : recoveryMarkers) {
+            if (marker instanceof Map) {
+                @SuppressWarnings("unchecked")
+                Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+                for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+                    bucketOffsets.merge(entry.getKey(), entry.getValue(), 
Math::min);
+                }
+            } else if (marker != null) {
+                LOG.warn(
+                        "Ignoring unrecognized recovery marker type: {}",
+                        marker.getClass().getName());
+            }
+        }
+
+        try (LogScanner scanner = table.newScan().createLogScanner()) {
+            if (bucketOffsets.isEmpty()) {
+                for (int b = 0; b < numBuckets; b++) {
+                    scanner.subscribeFromBeginning(b);
+                }
+            } else {
+                for (Map.Entry<Integer, Long> entry : 
bucketOffsets.entrySet()) {
+                    scanner.subscribe(entry.getKey(), entry.getValue());
+                }
+            }
+
+            int emptyPollCount = 0;
+            while (emptyPollCount < EMPTY_POLL_THRESHOLD) {
+                ScanRecords records = scanner.poll(POLL_TIMEOUT);
+                if (records.isEmpty()) {
+                    emptyPollCount++;
+                } else {
+                    emptyPollCount = 0;
+                    for (TableBucket bucket : records.buckets()) {
+                        for (ScanRecord record : records.records(bucket)) {
+                            InternalRow row = record.getRow();
+                            String stateKey = 
row.getString(COL_STATE_KEY).toString();
+                            byte[] payload = row.getBytes(COL_STATE_PAYLOAD);
+                            try {
+                                ActionState state =
+                                        OBJECT_MAPPER.readValue(payload, 
ActionState.class);
+                                actionStates.put(stateKey, state);
+                            } catch (Exception e) {
+                                LOG.warn(
+                                        "Failed to deserialize action state 
for key: {}, skipping",
+                                        stateKey,
+                                        e);
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to rebuild state from Fluss log 
table", e);
+        }
+
+        LOG.info("Completed rebuilding state, recovered {} states", 
actionStates.size());
+    }
+
+    /**
+     * Returns the end offsets of each bucket as a recovery marker. Similar to 
Kafka's
+     * implementation, this captures the current log position so that {@link 
#rebuildState} can
+     * resume from these offsets instead of scanning from the beginning.
+     */
+    @Override
+    public Object getRecoveryMarker() {
+        int numBuckets = table.getTableInfo().getNumBuckets();
+
+        try (Admin admin = connection.getAdmin()) {
+            List<Integer> buckets = new ArrayList<>();
+            for (int b = 0; b < numBuckets; b++) {
+                buckets.add(b);
+            }
+            return new HashMap<>(
+                    admin.listOffsets(tablePath, buckets, new 
OffsetSpec.LatestSpec()).all().get());
+        } catch (Exception e) {
+            LOG.error("Failed to get end offsets for Fluss table: {}", 
tablePath, e);
+            throw new RuntimeException("Failed to get end offsets for Fluss 
table", e);
+        }
+    }
+
+    /**
+     * Evicts pruned states from the in-memory cache. The Fluss log is 
append-only; physical cleanup
+     * relies on Fluss log retention configuration.
+     */
+    @Override
+    public void pruneState(Object key, long seqNum) {
+        LOG.debug("Pruning in-memory state for key: {} up to seqNum: {}", key, 
seqNum);
+
+        actionStates
+                .entrySet()
+                .removeIf(
+                        entry -> {
+                            String stateKey = entry.getKey();
+                            if (stateKey.startsWith(key.toString() + "_")) {
+                                try {
+                                    List<String> parts = 
ActionStateUtil.parseKey(stateKey);
+                                    if (parts.size() >= 2) {
+                                        long stateSeqNum = 
Long.parseLong(parts.get(1));
+                                        return stateSeqNum <= seqNum;
+                                    }
+                                } catch (NumberFormatException e) {
+                                    LOG.warn("Failed to parse seqNum from 
state key: {}", stateKey);
+                                }
+                            }
+                            return false;
+                        });
+
+        LOG.debug("Pruned in-memory state for key: {} up to seqNum: {}", key, 
seqNum);

Review Comment:
   Nit: the debug logs at the beginning and end of this method are almost 
identical
   



##########
runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link FlussActionStateStore} serialization and in-memory 
behavior. These tests
+ * verify the ObjectMapper configuration and ActionState serialization 
round-trips without requiring
+ * a real Fluss cluster.
+ */
+public class FlussActionStateStoreTest {

Review Comment:
   This test is currently closer to an ActionState serde test than a 
`FlussActionStateStore` test which shoud focus on store-level behavior, such as 
`put`, `get`, `rebuildState`, `pruneState` etc.
   



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.
+ * All state is maintained in an in-memory HashMap for O(1) lookup, with the 
Fluss log table
+ * providing durability and recovery support.
+ *
+ * <p>Compared to the previous KV-table implementation:
+ *
+ * <ul>
+ *   <li><b>Read</b>: O(1) in-memory HashMap lookup; no network I/O per query.
+ *   <li><b>Write</b>: Append to log + update in-memory HashMap.
+ *   <li><b>Recovery</b>: Scan log from recovery markers (or from beginning if 
none) to rebuild
+ *       in-memory state.
+ *   <li><b>Prune</b>: In-memory eviction only; physical cleanup relies on 
Fluss log retention.
+ * </ul>
+ *
+ * <p>The Fluss log table schema (no primary key — this creates a 
log/append-only table):
+ *
+ * <pre>
+ * state_key     STRING  -- Generated by ActionStateUtil.generateKey()
+ * state_payload BYTES   -- ActionState JSON serialized bytes
+ * agent_key     STRING  -- Original key; used as bucket key so all records 
for the same
+ *                          agent are co-located in the same bucket
+ * </pre>
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussActionStateStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+    /**
+     * Number of consecutive empty polls before considering the log fully 
consumed during recovery.
+     */
+    private static final int EMPTY_POLL_THRESHOLD = 3;
+
+    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+
+    /** Creates and configures the ObjectMapper for ActionState serialization. 
*/
+    private static ObjectMapper createObjectMapper() {
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+        mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+        return mapper;
+    }
+
+    /** Mixin to add type information for Event hierarchy. */
+    @JsonTypeInfo(
+            use = JsonTypeInfo.Id.CLASS,
+            include = JsonTypeInfo.As.PROPERTY,
+            property = "@class")
+    abstract static class EventTypeInfoMixin {}
+
+    // Column indices in the Fluss table schema
+    private static final int COL_STATE_KEY = 0;
+    private static final int COL_STATE_PAYLOAD = 1;
+    private static final int COL_AGENT_KEY = 2;
+    private static final int NUM_COLUMNS = 3;
+
+    private final AgentConfiguration agentConfiguration;
+    private final String databaseName;
+    private final String tableName;
+    private final TablePath tablePath;
+
+    private final Connection connection;
+    private final Table table;
+    private final AppendWriter writer;
+
+    /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on 
recovery. */
+    private final Map<String, ActionState> actionStates;
+
+    public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+        this.agentConfiguration = agentConfiguration;
+        this.databaseName = 
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+        this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE);
+        this.tablePath = TablePath.of(databaseName, tableName);
+        this.actionStates = new HashMap<>();
+
+        Configuration flussConf = new Configuration();
+        flussConf.setString("bootstrap.servers", 
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+
+        String securityProtocol = 
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+        if (securityProtocol != null) {
+            flussConf.setString("client.security.protocol", securityProtocol);
+        }
+        String saslMechanism = agentConfiguration.get(FLUSS_SASL_MECHANISM);
+        if (saslMechanism != null) {
+            flussConf.setString("client.security.sasl.mechanism", 
saslMechanism);
+        }
+        String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+        if (jaasConfig != null) {
+            flussConf.setString("client.security.sasl.jaas.config", 
jaasConfig);
+        }
+        String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+        if (username != null) {
+            flussConf.setString("client.security.sasl.username", username);
+        }
+        String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+        if (password != null) {
+            flussConf.setString("client.security.sasl.password", password);
+        }
+
+        this.connection = ConnectionFactory.createConnection(flussConf);
+        maybeCreateDatabaseAndTable();
+        this.table = connection.getTable(tablePath);
+        this.writer = table.newAppend().createWriter();
+
+        LOG.info(
+                "Initialized FlussActionStateStore (log table) with table: 
{}.{}",
+                databaseName,
+                tableName);
+    }
+
+    @Override
+    public void put(Object key, long seqNum, Action action, Event event, 
ActionState state)
+            throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        byte[] payload = OBJECT_MAPPER.writeValueAsBytes(state);
+
+        GenericRow row = new GenericRow(NUM_COLUMNS);
+        row.setField(COL_STATE_KEY, BinaryString.fromString(stateKey));
+        row.setField(COL_STATE_PAYLOAD, payload);
+        row.setField(COL_AGENT_KEY, BinaryString.fromString(key.toString()));
+
+        // Append to Fluss log for durability, then update in-memory cache.
+        // Synchronous write ensures the record is durable before returning.
+        writer.append(row).get();
+        actionStates.put(stateKey, state);
+
+        LOG.debug("Stored action state: key={}, isCompleted={}", stateKey, 
state.isCompleted());
+    }
+
+    @Override
+    public ActionState get(Object key, long seqNum, Action action, Event 
event) throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        ActionState state = actionStates.get(stateKey);
+        LOG.debug("Lookup action state: key={}, found={}", stateKey, state != 
null);
+        return state;
+    }
+
+    /**
+     * Rebuilds in-memory state by scanning the Fluss log table. If recovery 
markers are provided,
+     * computes the minimum offset per bucket across all markers and 
subscribes from those offsets.
+     * Otherwise, subscribes from the beginning. Reads until the log is 
exhausted ({@link
+     * #EMPTY_POLL_THRESHOLD} consecutive empty polls). For the same state key 
appearing multiple
+     * times in the log, the latest record wins (last-write-wins).
+     *
+     * <p>Note: Unlike the Kafka implementation which skips rebuild on empty 
markers, this
+     * implementation scans from the beginning when no markers are provided, 
enabling full state
+     * recovery even without prior checkpoints.
+     */
+    @Override
+    public void rebuildState(List<Object> recoveryMarkers) {
+        LOG.info(
+                "Rebuilding action state from Fluss log table with {} recovery 
markers",
+                recoveryMarkers.size());
+        actionStates.clear();
+
+        int numBuckets = table.getTableInfo().getNumBuckets();
+
+        // Compute per-bucket start offsets from recovery markers
+        Map<Integer, Long> bucketOffsets = new HashMap<>();
+        for (Object marker : recoveryMarkers) {
+            if (marker instanceof Map) {
+                @SuppressWarnings("unchecked")
+                Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+                for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+                    bucketOffsets.merge(entry.getKey(), entry.getValue(), 
Math::min);
+                }
+            } else if (marker != null) {
+                LOG.warn(
+                        "Ignoring unrecognized recovery marker type: {}",
+                        marker.getClass().getName());
+            }
+        }
+
+        try (LogScanner scanner = table.newScan().createLogScanner()) {
+            if (bucketOffsets.isEmpty()) {
+                for (int b = 0; b < numBuckets; b++) {
+                    scanner.subscribeFromBeginning(b);
+                }
+            } else {
+                for (Map.Entry<Integer, Long> entry : 
bucketOffsets.entrySet()) {
+                    scanner.subscribe(entry.getKey(), entry.getValue());
+                }
+            }
+
+            int emptyPollCount = 0;
+            while (emptyPollCount < EMPTY_POLL_THRESHOLD) {
+                ScanRecords records = scanner.poll(POLL_TIMEOUT);
+                if (records.isEmpty()) {
+                    emptyPollCount++;
+                } else {
+                    emptyPollCount = 0;
+                    for (TableBucket bucket : records.buckets()) {
+                        for (ScanRecord record : records.records(bucket)) {
+                            InternalRow row = record.getRow();
+                            String stateKey = 
row.getString(COL_STATE_KEY).toString();
+                            byte[] payload = row.getBytes(COL_STATE_PAYLOAD);
+                            try {
+                                ActionState state =
+                                        OBJECT_MAPPER.readValue(payload, 
ActionState.class);
+                                actionStates.put(stateKey, state);
+                            } catch (Exception e) {
+                                LOG.warn(
+                                        "Failed to deserialize action state 
for key: {}, skipping",
+                                        stateKey,
+                                        e);
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to rebuild state from Fluss log 
table", e);
+        }
+
+        LOG.info("Completed rebuilding state, recovered {} states", 
actionStates.size());
+    }
+
+    /**
+     * Returns the end offsets of each bucket as a recovery marker. Similar to 
Kafka's
+     * implementation, this captures the current log position so that {@link 
#rebuildState} can
+     * resume from these offsets instead of scanning from the beginning.
+     */
+    @Override
+    public Object getRecoveryMarker() {
+        int numBuckets = table.getTableInfo().getNumBuckets();
+
+        try (Admin admin = connection.getAdmin()) {
+            List<Integer> buckets = new ArrayList<>();
+            for (int b = 0; b < numBuckets; b++) {
+                buckets.add(b);
+            }
+            return new HashMap<>(
+                    admin.listOffsets(tablePath, buckets, new 
OffsetSpec.LatestSpec()).all().get());
+        } catch (Exception e) {
+            LOG.error("Failed to get end offsets for Fluss table: {}", 
tablePath, e);
+            throw new RuntimeException("Failed to get end offsets for Fluss 
table", e);
+        }
+    }
+
+    /**
+     * Evicts pruned states from the in-memory cache. The Fluss log is 
append-only; physical cleanup
+     * relies on Fluss log retention configuration.
+     */
+    @Override
+    public void pruneState(Object key, long seqNum) {
+        LOG.debug("Pruning in-memory state for key: {} up to seqNum: {}", key, 
seqNum);
+
+        actionStates
+                .entrySet()
+                .removeIf(
+                        entry -> {
+                            String stateKey = entry.getKey();
+                            if (stateKey.startsWith(key.toString() + "_")) {
+                                try {
+                                    List<String> parts = 
ActionStateUtil.parseKey(stateKey);
+                                    if (parts.size() >= 2) {
+                                        long stateSeqNum = 
Long.parseLong(parts.get(1));
+                                        return stateSeqNum <= seqNum;
+                                    }
+                                } catch (NumberFormatException e) {
+                                    LOG.warn("Failed to parse seqNum from 
state key: {}", stateKey);
+                                }
+                            }
+                            return false;
+                        });
+
+        LOG.debug("Pruned in-memory state for key: {} up to seqNum: {}", key, 
seqNum);
+    }
+
+    @Override
+    public void close() throws Exception {
+        Exception firstException = null;

Review Comment:
   Could we simplify this close logic? Given that each `put()` waits for 
`writer.append(row).get()`, I am not sure an extra `writer.flush()` is needed 
here. The `firstException` handling also feels more complex than necessary. 
Maybe we can just close the table and ensure the connection is closed in 
`finally`, letting any close exception propagate normally.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to