This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 3034cf0 Disable automatic topic creation in Kafka. (#3193) 3034cf0 is described below commit 3034cf021f0f344be68a5e23ba2a5a5537a6d4ad Author: Sven Lange-Last <sven.lange-l...@de.ibm.com> AuthorDate: Mon Jan 22 13:43:10 2018 +0100 Disable automatic topic creation in Kafka. (#3193) So far, we had automatic topic creation - the broker would automatically create a topic if a producer sent a message to a topic or a consumer tried to receive messages from a topic. In that case, topic would be created automatically with default settings. These default settings are not always what we need. In the end, this is a race condition during deployment. If we manage to create a topic with desired settings before it gets auto-created with default settings, we are fine. Otherwis [...] This change disables automatic topic creation. --- ansible/roles/kafka/tasks/deploy.yml | 1 + tests/src/test/resources/application.conf | 14 +++++++++++++ .../test/scala/services/KafkaConnectorTests.scala | 24 ++++++++++++++-------- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml index 7c6feda..cf59919 100644 --- a/ansible/roles/kafka/tasks/deploy.yml +++ b/ansible/roles/kafka/tasks/deploy.yml @@ -23,6 +23,7 @@ "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}" "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}" "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}" + "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false" ports: - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:9092" diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf index 039f2f4..53f420d 100644 --- a/tests/src/test/resources/application.conf +++ b/tests/src/test/resources/application.conf @@ -8,3 +8,17 @@ whisk.spi { akka.http.client.idle-timeout = 90 s akka.http.host-connection-pool.idle-timeout = 90 s akka.http.host-connection-pool.client.idle-timeout = 90 s + +whisk { + # kafka related configuration + kafka { + replication-factor = 1 + topics { + KafkaConnectorTestTopic { + segment-bytes = 536870912 + retention-bytes = 1073741824 + retention-ms = 3600000 + } + } + } +} diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala index 6001c9c..d0c3019 100644 --- a/tests/src/test/scala/services/KafkaConnectorTests.scala +++ b/tests/src/test/scala/services/KafkaConnectorTests.scala @@ -34,6 +34,7 @@ import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem} import whisk.common.TransactionId import whisk.connector.kafka.KafkaConsumerConnector import whisk.connector.kafka.KafkaProducerConnector +import whisk.connector.kafka.KafkaMessagingProvider import whisk.core.WhiskConfig import whisk.core.connector.Message import whisk.utils.ExecutionContextFactory @@ -48,7 +49,17 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit assert(config.isValid) val groupid = "kafkatest" - val topic = "Dinosaurs" + val topic = "KafkaConnectorTestTopic" + + // Need to overwrite replication factor for tests that shut down and start + // Kafka instances intentionally. These tests will fail if there is more than + // one Kafka host but a replication factor of 1. + val kafkaHosts = config.kafkaHosts.split(",") + val replicationFactor = kafkaHosts.length / 2 + 1 + System.setProperty("whisk.kafka.replication-factor", replicationFactor.toString) + println(s"Create test topic '${topic}' with replicationFactor=${replicationFactor}") + assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic ${topic} failed") + val sessionTimeout = 10 seconds val maxPollInterval = 10 seconds val producer = new KafkaProducerConnector(config.kafkaHosts, ec) @@ -128,9 +139,8 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit } } - it should "send and receive a kafka message even after shutdown one of instances" in { - val kafkaHosts = config.kafkaHosts.split(",") - if (kafkaHosts.length > 1) { + if (kafkaHosts.length > 1) { + it should "send and receive a kafka message even after shutdown one of instances" in { for (i <- 0 until kafkaHosts.length) { val message = new Message { override val serialize = Calendar.getInstance().getTime().toString } val kafkaHost = kafkaHosts(i).split(":")(0) @@ -138,8 +148,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length commandComponent(kafkaHost, "stop", s"kafka$i") - var received = sendAndReceiveMessage(message, 30 seconds, 30 seconds) - received.size should be(1) + sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size (1) consumer.commit() commandComponent(kafkaHost, "start", s"kafka$i") @@ -149,8 +158,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit .length shouldBe prevCount + 1 }, 20, Some(1.second)) // wait until kafka is up - received = sendAndReceiveMessage(message, 30 seconds, 30 seconds) - received.size should be(1) + sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size (1) consumer.commit() } } -- To stop receiving notification emails like this one, please contact markusthoem...@apache.org.