This is an automated email from the ASF dual-hosted git repository.

sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 42a7e68  [hotfix] Fix the unsupported short type configuration option 
(#213)
42a7e68 is described below

commit 42a7e68992d71edc0a41387ca7220cf9d1ba6c43
Author: Eugene <[email protected]>
AuthorDate: Thu Sep 25 20:11:00 2025 +0800

    [hotfix] Fix the unsupported short type configuration option (#213)
    
    This Closes #212.
---
 .../apache/flink/agents/api/configuration/AgentConfigOptions.java    | 4 ++--
 .../flink/agents/runtime/actionstate/KafkaActionStateStore.java      | 5 +++--
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index 987d007..5efe2b8 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -41,6 +41,6 @@ public class AgentConfigOptions {
             new ConfigOption<>("kafkaActionStateTopicNumPartitions", 
Integer.class, 64);
 
     /** The config parameter specifies the replication factor for the Kafka 
action state topic. */
-    public static final ConfigOption<Short> 
KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR =
-            new ConfigOption<>("kafkaActionStateTopicReplicationFactor", 
Short.class, (short) 1);
+    public static final ConfigOption<Integer> 
KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR =
+            new ConfigOption<>("kafkaActionStateTopicReplicationFactor", 
Integer.class, 1);
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
index 25e585b..7e35219 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
@@ -311,8 +311,9 @@ public class KafkaActionStateStore implements 
ActionStateStore {
                         new NewTopic(
                                 topic,
                                 
agentConfiguration.get(KAFKA_ACTION_STATE_TOPIC_NUM_PARTITIONS),
-                                agentConfiguration.get(
-                                        
KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR));
+                                agentConfiguration
+                                        
.get(KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR)
+                                        .shortValue());
                 // enable topic compaction
                 newTopic.configs(Map.of("cleanup.policy", "compact"));
                 adminClient.createTopics(List.of(newTopic)).all().get();

Reply via email to