This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 3034cf0  Disable automatic topic creation in Kafka. (#3193)
3034cf0 is described below

commit 3034cf021f0f344be68a5e23ba2a5a5537a6d4ad
Author: Sven Lange-Last <sven.lange-l...@de.ibm.com>
AuthorDate: Mon Jan 22 13:43:10 2018 +0100

    Disable automatic topic creation in Kafka. (#3193)
    
    So far, we had automatic topic creation - the broker would automatically 
create a topic if a producer sent a message to a topic or a consumer tried to 
receive messages from a topic. In that case, topic would be created 
automatically with default settings. These default settings are not always what 
we need. In the end, this is a race condition during deployment. If we manage 
to create a topic with desired settings before it gets auto-created with 
default settings, we are fine. Otherwis [...]
    
    This change disables automatic topic creation.
---
 ansible/roles/kafka/tasks/deploy.yml               |  1 +
 tests/src/test/resources/application.conf          | 14 +++++++++++++
 .../test/scala/services/KafkaConnectorTests.scala  | 24 ++++++++++++++--------
 3 files changed, 31 insertions(+), 8 deletions(-)

diff --git a/ansible/roles/kafka/tasks/deploy.yml 
b/ansible/roles/kafka/tasks/deploy.yml
index 7c6feda..cf59919 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -23,6 +23,7 @@
       "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
       "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
       "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+      "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false"
     ports:
       - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:9092"
 
diff --git a/tests/src/test/resources/application.conf 
b/tests/src/test/resources/application.conf
index 039f2f4..53f420d 100644
--- a/tests/src/test/resources/application.conf
+++ b/tests/src/test/resources/application.conf
@@ -8,3 +8,17 @@ whisk.spi {
 akka.http.client.idle-timeout = 90 s
 akka.http.host-connection-pool.idle-timeout = 90 s
 akka.http.host-connection-pool.client.idle-timeout = 90 s
+
+whisk {
+    # kafka related configuration
+    kafka {
+        replication-factor = 1
+        topics {
+            KafkaConnectorTestTopic {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 3600000
+            }
+        }
+    }
+}
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala 
b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 6001c9c..d0c3019 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -34,6 +34,7 @@ import common.{StreamLogging, TestUtils, WhiskProperties, 
WskActorSystem}
 import whisk.common.TransactionId
 import whisk.connector.kafka.KafkaConsumerConnector
 import whisk.connector.kafka.KafkaProducerConnector
+import whisk.connector.kafka.KafkaMessagingProvider
 import whisk.core.WhiskConfig
 import whisk.core.connector.Message
 import whisk.utils.ExecutionContextFactory
@@ -48,7 +49,17 @@ class KafkaConnectorTests extends FlatSpec with Matchers 
with WskActorSystem wit
   assert(config.isValid)
 
   val groupid = "kafkatest"
-  val topic = "Dinosaurs"
+  val topic = "KafkaConnectorTestTopic"
+
+  // Need to overwrite replication factor for tests that shut down and start
+  // Kafka instances intentionally. These tests will fail if there is more than
+  // one Kafka host but a replication factor of 1.
+  val kafkaHosts = config.kafkaHosts.split(",")
+  val replicationFactor = kafkaHosts.length / 2 + 1
+  System.setProperty("whisk.kafka.replication-factor", 
replicationFactor.toString)
+  println(s"Create test topic '${topic}' with 
replicationFactor=${replicationFactor}")
+  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation 
of topic ${topic} failed")
+
   val sessionTimeout = 10 seconds
   val maxPollInterval = 10 seconds
   val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
@@ -128,9 +139,8 @@ class KafkaConnectorTests extends FlatSpec with Matchers 
with WskActorSystem wit
     }
   }
 
-  it should "send and receive a kafka message even after shutdown one of 
instances" in {
-    val kafkaHosts = config.kafkaHosts.split(",")
-    if (kafkaHosts.length > 1) {
+  if (kafkaHosts.length > 1) {
+    it should "send and receive a kafka message even after shutdown one of 
instances" in {
       for (i <- 0 until kafkaHosts.length) {
         val message = new Message { override val serialize = 
Calendar.getInstance().getTime().toString }
         val kafkaHost = kafkaHosts(i).split(":")(0)
@@ -138,8 +148,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers 
with WskActorSystem wit
         val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, 
"logs", s"kafka$i").stdout).length
 
         commandComponent(kafkaHost, "stop", s"kafka$i")
-        var received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
-        received.size should be(1)
+        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have 
size (1)
         consumer.commit()
 
         commandComponent(kafkaHost, "start", s"kafka$i")
@@ -149,8 +158,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers 
with WskActorSystem wit
             .length shouldBe prevCount + 1
         }, 20, Some(1.second)) // wait until kafka is up
 
-        received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
-        received.size should be(1)
+        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have 
size (1)
         consumer.commit()
       }
     }

-- 
To stop receiving notification emails like this one, please contact
markusthoem...@apache.org.

Reply via email to