This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 364ace7a7 Fix error handling and partition config bugs in 
KafkaMessagingProvider.ensureTopic (#5527)
364ace7a7 is described below

commit 364ace7a7e670815aa0cf8fccb3de081fa3ad1b9
Author: Quinten Parker <[email protected]>
AuthorDate: Tue Jan 14 13:56:43 2025 -0800

    Fix error handling and partition config bugs in 
KafkaMessagingProvider.ensureTopic (#5527)
---
 .../openwhisk/connector/kafka/KafkaMessagingProvider.scala       | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
index f61f6ec7a..dd3889527 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
@@ -41,6 +41,8 @@ case class KafkaConfig(replicationFactor: Short, 
consumerLagCheckInterval: Finit
 object KafkaMessagingProvider extends MessagingProvider {
   import KafkaConfiguration._
 
+  private val topicPartitionsConfigKey = "partitions"
+
   def getConsumer(config: WhiskConfig, groupId: String, topic: String, 
maxPeek: Int, maxPollInterval: FiniteDuration)(
     implicit logging: Logging,
     actorSystem: ActorSystem): MessageConsumer =
@@ -64,12 +66,13 @@ object KafkaMessagingProvider extends MessagingProvider {
 
     Try(AdminClient.create(commonConfig + 
(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
       .flatMap(client => {
-        val partitions = topicConfig.getOrElse("partitions", "1").toInt
-        val nt = new NewTopic(topic, partitions, 
kafkaConfig.replicationFactor).configs(topicConfig.asJava)
+        val partitions = topicConfig.getOrElse(topicPartitionsConfigKey, 
"1").toInt
+        val safeTopicConfig = topicConfig - topicPartitionsConfigKey
+        val nt = new NewTopic(topic, partitions, 
kafkaConfig.replicationFactor).configs(safeTopicConfig.asJava)
 
         def createTopic(retries: Int = 5): Try[Unit] = {
           Try(client.listTopics().names().get())
-            .map(topics =>
+            .flatMap(topics =>
               if (topics.contains(topic)) {
                 Success(logging.info(this, s"$topic already exists and the 
user can see it, skipping creation."))
               } else {

Reply via email to