Sxnan commented on code in PR #181:
URL: https://github.com/apache/flink-agents/pull/181#discussion_r2371306579
##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java:
##########
@@ -33,38 +57,303 @@
*/
public class KafkaActionStateStore implements ActionStateStore {
- // In memory action state for quick state retrival, this map is only used
during recovery
- private final Map<String, Map<String, ActionState>> keyedActionStates;
+ private static final Duration CONSUMER_POLL_TIMEOUT =
Duration.ofMillis(1000);
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaActionStateStore.class);
+ private static final Long DEFAULT_FUTURE_GET_TIMEOUT_MS = 100L;
+
+ private final AgentConfiguration agentConfiguration;
+
+ // In memory action state for quick state retrieval
+ private final Map<String, ActionState> actionStates;
+
+ // Record the lastest sequence number for each key that should be
considered as valid
+ private final Map<String, Long> latestKeySeqNum;
+
+ // Kafka producer
+ private final Producer<String, ActionState> producer;
+ // Kafka consumer
+ private final Consumer<String, ActionState> consumer;
+
+ // Kafka topic that stores action states
+ private final String topic;
@VisibleForTesting
- KafkaActionStateStore(Map<String, Map<String, ActionState>>
keyedActionStates) {
- this.keyedActionStates = keyedActionStates;
+ KafkaActionStateStore(
+ Map<String, ActionState> actionStates,
+ AgentConfiguration agentConfiguration,
+ Producer<String, ActionState> producer,
+ Consumer<String, ActionState> consumer,
+ String topic) {
+ this.actionStates = actionStates;
+ this.producer = producer;
+ this.consumer = consumer;
+ this.topic = topic;
+ this.latestKeySeqNum = new HashMap<>();
+ this.agentConfiguration = agentConfiguration;
}
- /** Constructs a new KafkaActionStateStore with an empty in-memory action
state map. */
- public KafkaActionStateStore() {
- this(new HashMap<>());
+ /** Constructs a new KafkaActionStateStore with custom Kafka
configuration. */
+ public KafkaActionStateStore(AgentConfiguration agentConfiguration) {
+ this.actionStates = new HashMap<>();
+ this.latestKeySeqNum = new HashMap<>();
+ this.agentConfiguration = agentConfiguration;
+ this.topic = agentConfiguration.get(KAFKA_ACTION_STATE_TOPIC);
+ // create the topic if not exists
+ maybeCreateTopic();
+ Properties producerProp = createProducerProp();
+ this.producer = new KafkaProducer<>(producerProp);
+ Properties consumerProp = createConsumerProp();
+ this.consumer = new KafkaConsumer<>(consumerProp);
+ consumer.subscribe(Collections.singletonList(topic));
+ LOG.info("Initialized KafkaActionStateStore with topic: {}", topic);
}
@Override
public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
- throws IOException {
- // TODO: Implement me
+ throws Exception {
+ if (producer == null) {
+ LOG.error("Producer is null, cannot put action state to Kafka");
+ return;
+ }
+
+ String stateKey = generateKey(key, seqNum, action, event);
+ try {
+ ProducerRecord<String, ActionState> kafkaRecord =
+ new ProducerRecord<>(topic, stateKey, state);
+ Future<RecordMetadata> recordFuture = producer.send(kafkaRecord);
+ RecordMetadata recordMetadata =
+ recordFuture.get(DEFAULT_FUTURE_GET_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ LOG.debug("Sent action state to Kafka for key: {}", stateKey);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to send action state to Kafka",
e);
+ }
}
@Override
- public ActionState get(Object key, long seqNum, Action action, Event
event) throws IOException {
- // TODO: Implement me
- return null;
+ public ActionState get(Object key, long seqNum, Action action, Event
event) throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+
+ boolean hasDivergence = checkDivergence(key.toString(), seqNum);
Review Comment:
Thanks for the explanation! LGTM
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]