Sxnan commented on code in PR #181:
URL: https://github.com/apache/flink-agents/pull/181#discussion_r2367358405
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java:
##########
@@ -33,38 +57,303 @@
*/
public class KafkaActionStateStore implements ActionStateStore {
- // In memory action state for quick state retrival, this map is only used
during recovery
- private final Map<String, Map<String, ActionState>> keyedActionStates;
+ private static final Duration CONSUMER_POLL_TIMEOUT =
Duration.ofMillis(1000);
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaActionStateStore.class);
+ private static final Long DEFAULT_FUTURE_GET_TIMEOUT_MS = 100L;
+
+ private final AgentConfiguration agentConfiguration;
+
+ // In memory action state for quick state retrieval
+ private final Map<String, ActionState> actionStates;
+
+ // Record the lastest sequence number for each key that should be
considered as valid
+ private final Map<String, Long> latestKeySeqNum;
+
+ // Kafka producer
+ private final Producer<String, ActionState> producer;
+ // Kafka consumer
+ private final Consumer<String, ActionState> consumer;
+
+ // Kafka topic that stores action states
+ private final String topic;
@VisibleForTesting
- KafkaActionStateStore(Map<String, Map<String, ActionState>>
keyedActionStates) {
- this.keyedActionStates = keyedActionStates;
+ KafkaActionStateStore(
+ Map<String, ActionState> actionStates,
+ AgentConfiguration agentConfiguration,
+ Producer<String, ActionState> producer,
+ Consumer<String, ActionState> consumer,
+ String topic) {
+ this.actionStates = actionStates;
+ this.producer = producer;
+ this.consumer = consumer;
+ this.topic = topic;
+ this.latestKeySeqNum = new HashMap<>();
+ this.agentConfiguration = agentConfiguration;
}
- /** Constructs a new KafkaActionStateStore with an empty in-memory action
state map. */
- public KafkaActionStateStore() {
- this(new HashMap<>());
+ /** Constructs a new KafkaActionStateStore with custom Kafka
configuration. */
+ public KafkaActionStateStore(AgentConfiguration agentConfiguration) {
+ this.actionStates = new HashMap<>();
+ this.latestKeySeqNum = new HashMap<>();
+ this.agentConfiguration = agentConfiguration;
+ this.topic = agentConfiguration.get(KAFKA_ACTION_STATE_TOPIC);
+ // create the topic if not exists
+ maybeCreateTopic();
Review Comment:
Should we support creating the topic if it is not exist, or we require that
the topic is created beforehand.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaDeserializer.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.runtime.operator.ActionTask;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Kafka deserializer for {@link ActionState}.
+ *
+ * <p>This deserializer handles the deserialization of byte arrays from Kafka
back to ActionState
+ * instances. It uses Jackson ObjectMapper with custom deserializers to handle
polymorphic Event
+ * types and ensures ActionTask is deserialized as null.
+ */
+public class ActionStateKafkaDeserializer implements Deserializer<ActionState>
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ActionStateKafkaDeserializer.class);
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // No configuration needed
+ }
+
+ @Override
+ public ActionState deserialize(String topic, byte[] data) {
+ if (data == null) {
+ return null;
+ }
+
+ try {
+ return OBJECT_MAPPER.readValue(data, ActionState.class);
+ } catch (Exception e) {
+ LOG.error("Failed to deserialize ActionState for topic: {}",
topic, e);
+ throw new RuntimeException("Failed to deserialize ActionState", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ // No resources to close
+ }
+
+ /** Creates and configures the ObjectMapper for ActionState
deserialization. */
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+
+ // Add type information for polymorphic Event deserialization
+ mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
+ mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
+
+ // Create a module for custom deserializers
+ SimpleModule module = new SimpleModule();
+
+ // Custom deserializer for ActionTask - always deserialize as null
+ module.addDeserializer(ActionTask.class, new ActionTaskDeserializer());
+
+ mapper.registerModule(module);
+
+ return mapper;
+ }
+
+ /** Mixin to add type information for Event hierarchy. */
+ @JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "@class")
+ public abstract static class EventTypeInfoMixin {}
Review Comment:
Same as the one in `ActionStateKafkaSerializer`, can we reuse?
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java:
##########
@@ -24,7 +24,7 @@
import java.util.List;
/** Interface for storing and retrieving the state of actions performed by
agents. */
-public interface ActionStateStore {
+public interface ActionStateStore extends AutoCloseable {
Review Comment:
`close` method is not called. Should it be call in the
`ActionExecutionOperator`?
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java:
##########
@@ -33,38 +57,303 @@
*/
public class KafkaActionStateStore implements ActionStateStore {
- // In memory action state for quick state retrival, this map is only used
during recovery
- private final Map<String, Map<String, ActionState>> keyedActionStates;
+ private static final Duration CONSUMER_POLL_TIMEOUT =
Duration.ofMillis(1000);
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaActionStateStore.class);
+ private static final Long DEFAULT_FUTURE_GET_TIMEOUT_MS = 100L;
+
+ private final AgentConfiguration agentConfiguration;
+
+ // In memory action state for quick state retrieval
+ private final Map<String, ActionState> actionStates;
+
+ // Record the lastest sequence number for each key that should be
considered as valid
+ private final Map<String, Long> latestKeySeqNum;
+
+ // Kafka producer
+ private final Producer<String, ActionState> producer;
+ // Kafka consumer
+ private final Consumer<String, ActionState> consumer;
+
+ // Kafka topic that stores action states
+ private final String topic;
@VisibleForTesting
- KafkaActionStateStore(Map<String, Map<String, ActionState>>
keyedActionStates) {
- this.keyedActionStates = keyedActionStates;
+ KafkaActionStateStore(
+ Map<String, ActionState> actionStates,
+ AgentConfiguration agentConfiguration,
+ Producer<String, ActionState> producer,
+ Consumer<String, ActionState> consumer,
+ String topic) {
+ this.actionStates = actionStates;
+ this.producer = producer;
+ this.consumer = consumer;
+ this.topic = topic;
+ this.latestKeySeqNum = new HashMap<>();
+ this.agentConfiguration = agentConfiguration;
}
- /** Constructs a new KafkaActionStateStore with an empty in-memory action
state map. */
- public KafkaActionStateStore() {
- this(new HashMap<>());
+ /** Constructs a new KafkaActionStateStore with custom Kafka
configuration. */
+ public KafkaActionStateStore(AgentConfiguration agentConfiguration) {
+ this.actionStates = new HashMap<>();
+ this.latestKeySeqNum = new HashMap<>();
+ this.agentConfiguration = agentConfiguration;
+ this.topic = agentConfiguration.get(KAFKA_ACTION_STATE_TOPIC);
+ // create the topic if not exists
+ maybeCreateTopic();
+ Properties producerProp = createProducerProp();
+ this.producer = new KafkaProducer<>(producerProp);
+ Properties consumerProp = createConsumerProp();
+ this.consumer = new KafkaConsumer<>(consumerProp);
+ consumer.subscribe(Collections.singletonList(topic));
+ LOG.info("Initialized KafkaActionStateStore with topic: {}", topic);
}
@Override
public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
- throws IOException {
- // TODO: Implement me
+ throws Exception {
+ if (producer == null) {
+ LOG.error("Producer is null, cannot put action state to Kafka");
+ return;
+ }
+
+ String stateKey = generateKey(key, seqNum, action, event);
+ try {
+ ProducerRecord<String, ActionState> kafkaRecord =
+ new ProducerRecord<>(topic, stateKey, state);
+ Future<RecordMetadata> recordFuture = producer.send(kafkaRecord);
+ RecordMetadata recordMetadata =
+ recordFuture.get(DEFAULT_FUTURE_GET_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ LOG.debug("Sent action state to Kafka for key: {}", stateKey);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to send action state to Kafka",
e);
+ }
}
@Override
- public ActionState get(Object key, long seqNum, Action action, Event
event) throws IOException {
- // TODO: Implement me
- return null;
+ public ActionState get(Object key, long seqNum, Action action, Event
event) throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+
+ boolean hasDivergence = checkDivergence(key.toString(), seqNum);
Review Comment:
Can you elaborate on what case we have divergence? Why do we need to check?
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java:
##########
@@ -33,38 +57,303 @@
*/
public class KafkaActionStateStore implements ActionStateStore {
- // In memory action state for quick state retrival, this map is only used
during recovery
- private final Map<String, Map<String, ActionState>> keyedActionStates;
+ private static final Duration CONSUMER_POLL_TIMEOUT =
Duration.ofMillis(1000);
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaActionStateStore.class);
+ private static final Long DEFAULT_FUTURE_GET_TIMEOUT_MS = 100L;
+
+ private final AgentConfiguration agentConfiguration;
+
+ // In memory action state for quick state retrieval
+ private final Map<String, ActionState> actionStates;
+
+ // Record the lastest sequence number for each key that should be
considered as valid
+ private final Map<String, Long> latestKeySeqNum;
+
+ // Kafka producer
+ private final Producer<String, ActionState> producer;
+ // Kafka consumer
+ private final Consumer<String, ActionState> consumer;
+
+ // Kafka topic that stores action states
+ private final String topic;
@VisibleForTesting
- KafkaActionStateStore(Map<String, Map<String, ActionState>>
keyedActionStates) {
- this.keyedActionStates = keyedActionStates;
+ KafkaActionStateStore(
+ Map<String, ActionState> actionStates,
+ AgentConfiguration agentConfiguration,
+ Producer<String, ActionState> producer,
+ Consumer<String, ActionState> consumer,
+ String topic) {
+ this.actionStates = actionStates;
+ this.producer = producer;
+ this.consumer = consumer;
+ this.topic = topic;
+ this.latestKeySeqNum = new HashMap<>();
+ this.agentConfiguration = agentConfiguration;
}
- /** Constructs a new KafkaActionStateStore with an empty in-memory action
state map. */
- public KafkaActionStateStore() {
- this(new HashMap<>());
+ /** Constructs a new KafkaActionStateStore with custom Kafka
configuration. */
+ public KafkaActionStateStore(AgentConfiguration agentConfiguration) {
+ this.actionStates = new HashMap<>();
+ this.latestKeySeqNum = new HashMap<>();
+ this.agentConfiguration = agentConfiguration;
+ this.topic = agentConfiguration.get(KAFKA_ACTION_STATE_TOPIC);
+ // create the topic if not exists
+ maybeCreateTopic();
+ Properties producerProp = createProducerProp();
+ this.producer = new KafkaProducer<>(producerProp);
+ Properties consumerProp = createConsumerProp();
+ this.consumer = new KafkaConsumer<>(consumerProp);
+ consumer.subscribe(Collections.singletonList(topic));
+ LOG.info("Initialized KafkaActionStateStore with topic: {}", topic);
}
@Override
public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
- throws IOException {
- // TODO: Implement me
+ throws Exception {
+ if (producer == null) {
+ LOG.error("Producer is null, cannot put action state to Kafka");
+ return;
+ }
+
+ String stateKey = generateKey(key, seqNum, action, event);
+ try {
+ ProducerRecord<String, ActionState> kafkaRecord =
+ new ProducerRecord<>(topic, stateKey, state);
+ Future<RecordMetadata> recordFuture = producer.send(kafkaRecord);
+ RecordMetadata recordMetadata =
Review Comment:
The `recordMetadata` is unused.
##########
tools/lint.sh:
##########
@@ -67,7 +67,7 @@ install_python_deps() {
if command -v uv >/dev/null 2>&1; then
echo "Using uv for dependency management"
pushd python
- uv sync --extra lint
+ uv sync --extra lint --index-strategy unsafe-best-match
Review Comment:
Why do we need this change?
##########
api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java:
##########
@@ -27,4 +27,20 @@ public class AgentConfigOptions {
/** The config parameter specifies the backend for action state store. */
public static final ConfigOption<String> ACTION_STATE_STORE_BACKEND =
new ConfigOption<>("actionStateStoreBackend", String.class, null);
+
+ /** The config parameter specifies the Kafka bootstrap server. */
+ public static final ConfigOption<String> KAFKA_BOOTSTRAP_SERVERS =
+ new ConfigOption<>("kafkaBootstrapServers", String.class,
"localhost:9092");
+
+ /** The config parameter specifies the Kafka topic for action state. */
+ public static final ConfigOption<String> KAFKA_ACTION_STATE_TOPIC =
+ new ConfigOption<>("kafkaActionStateTopic", String.class,
"flink-agents-action-state");
+
+ /** The config parameter sepcifies the number of partitions for the Kafka
action state topic. */
+ public static final ConfigOption<Integer>
KAFKA_ACTION_STATE_TOPIC_NUM_PARTITIONS =
Review Comment:
Why do we need to specify the number of partitions? Should we allow creating
Kafka topic during job run?
##########
api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java:
##########
@@ -27,4 +27,20 @@ public class AgentConfigOptions {
/** The config parameter specifies the backend for action state store. */
public static final ConfigOption<String> ACTION_STATE_STORE_BACKEND =
new ConfigOption<>("actionStateStoreBackend", String.class, null);
+
+ /** The config parameter specifies the Kafka bootstrap server. */
+ public static final ConfigOption<String> KAFKA_BOOTSTRAP_SERVERS =
+ new ConfigOption<>("kafkaBootstrapServers", String.class,
"localhost:9092");
+
+ /** The config parameter specifies the Kafka topic for action state. */
+ public static final ConfigOption<String> KAFKA_ACTION_STATE_TOPIC =
+ new ConfigOption<>("kafkaActionStateTopic", String.class,
"flink-agents-action-state");
Review Comment:
I think it is better to use `null` as the default value and throw an
exception if it is null. Otherwise, it is problematic when multiple jobs use
the same default topic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]