This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 42a7e68 [hotfix] Fix the unsupported short type configuration option
(#213)
42a7e68 is described below
commit 42a7e68992d71edc0a41387ca7220cf9d1ba6c43
Author: Eugene <[email protected]>
AuthorDate: Thu Sep 25 20:11:00 2025 +0800
[hotfix] Fix the unsupported short type configuration option (#213)
This Closes #212.
---
.../apache/flink/agents/api/configuration/AgentConfigOptions.java | 4 ++--
.../flink/agents/runtime/actionstate/KafkaActionStateStore.java | 5 +++--
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index 987d007..5efe2b8 100644
---
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -41,6 +41,6 @@ public class AgentConfigOptions {
new ConfigOption<>("kafkaActionStateTopicNumPartitions",
Integer.class, 64);
/** The config parameter specifies the replication factor for the Kafka
action state topic. */
- public static final ConfigOption<Short>
KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR =
- new ConfigOption<>("kafkaActionStateTopicReplicationFactor",
Short.class, (short) 1);
+ public static final ConfigOption<Integer>
KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR =
+ new ConfigOption<>("kafkaActionStateTopicReplicationFactor",
Integer.class, 1);
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
index 25e585b..7e35219 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
@@ -311,8 +311,9 @@ public class KafkaActionStateStore implements
ActionStateStore {
new NewTopic(
topic,
agentConfiguration.get(KAFKA_ACTION_STATE_TOPIC_NUM_PARTITIONS),
- agentConfiguration.get(
-
KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR));
+ agentConfiguration
+
.get(KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR)
+ .shortValue());
// enable topic compaction
newTopic.configs(Map.of("cleanup.policy", "compact"));
adminClient.createTopics(List.of(newTopic)).all().get();