This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 364ace7a7 Fix error handling and partition config bugs in
KafkaMessagingProvider.ensureTopic (#5527)
364ace7a7 is described below
commit 364ace7a7e670815aa0cf8fccb3de081fa3ad1b9
Author: Quinten Parker <[email protected]>
AuthorDate: Tue Jan 14 13:56:43 2025 -0800
Fix error handling and partition config bugs in
KafkaMessagingProvider.ensureTopic (#5527)
---
.../openwhisk/connector/kafka/KafkaMessagingProvider.scala | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
index f61f6ec7a..dd3889527 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
@@ -41,6 +41,8 @@ case class KafkaConfig(replicationFactor: Short,
consumerLagCheckInterval: Finit
object KafkaMessagingProvider extends MessagingProvider {
import KafkaConfiguration._
+ private val topicPartitionsConfigKey = "partitions"
+
def getConsumer(config: WhiskConfig, groupId: String, topic: String,
maxPeek: Int, maxPollInterval: FiniteDuration)(
implicit logging: Logging,
actorSystem: ActorSystem): MessageConsumer =
@@ -64,12 +66,13 @@ object KafkaMessagingProvider extends MessagingProvider {
Try(AdminClient.create(commonConfig +
(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
.flatMap(client => {
- val partitions = topicConfig.getOrElse("partitions", "1").toInt
- val nt = new NewTopic(topic, partitions,
kafkaConfig.replicationFactor).configs(topicConfig.asJava)
+ val partitions = topicConfig.getOrElse(topicPartitionsConfigKey,
"1").toInt
+ val safeTopicConfig = topicConfig - topicPartitionsConfigKey
+ val nt = new NewTopic(topic, partitions,
kafkaConfig.replicationFactor).configs(safeTopicConfig.asJava)
def createTopic(retries: Int = 5): Try[Unit] = {
Try(client.listTopics().names().get())
- .map(topics =>
+ .flatMap(topics =>
if (topics.contains(topic)) {
Success(logging.info(this, s"$topic already exists and the
user can see it, skipping creation."))
} else {