This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 566990c  Do not skip last sequence if an exception occurs (#364)
566990c is described below

commit 566990cfb1c113877b318857036f9151bc4430d1
Author: James Dubee <jwdu...@us.ibm.com>
AuthorDate: Mon Jan 6 14:21:43 2020 -0500

    Do not skip last sequence if an exception occurs (#364)
---
 provider/service.py                                | 25 ++++++---
 .../system/packages/MessageHubFeedTests.scala      | 61 +++++++++++++++++++++-
 2 files changed, 78 insertions(+), 8 deletions(-)

diff --git a/provider/service.py b/provider/service.py
index fa8e109..b8a272f 100644
--- a/provider/service.py
+++ b/provider/service.py
@@ -68,11 +68,6 @@ class Service (Thread):
                     # check whether or not the feed is capable of detecting 
canary
                     # documents
                     if change != None:
-                        # Record the sequence in case the changes feed needs 
to be
-                        # restarted. This way the new feed can pick up right 
where
-                        # the old one left off.
-                        self.lastSequence = change['seq']
-
                         if "deleted" in change and change["deleted"] == True:
                             logging.info('[changes] Found a delete')
                             consumer = 
self.consumers.getConsumerForTrigger(change['id'])
@@ -109,7 +104,20 @@ class Service (Thread):
                                 elif triggerIsAssignedToMe:
                                     logging.info('[{}] Found a change to an 
existing trigger'.format(change["id"]))
 
-                                    if existingConsumer.desiredState() == 
Consumer.State.Disabled and self.__isTriggerDocActive(document):
+                                    if existingConsumer.desiredState() == 
Consumer.State.Dead and self.__isTriggerDocActive(document):
+                                        # if a delete occurs followed quickly 
by a create the consumer might get stuck in a dead state,
+                                        # so we need to forcefully delete the 
process before recreating it.
+                                        logging.info('[{}] A create event 
occurred for a trigger that is shutting down'.format(change["id"]))
+
+                                        if existingConsumer.process.is_alive():
+                                            logging.info('[{}] Joining dead 
process.'.format(existingConsumer.trigger))
+                                            existingConsumer.process.join(1)
+                                        else:
+                                            logging.info('[{}] Process is 
already dead.'.format(existingConsumer.trigger))
+
+                                        
self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
+                                        self.createAndRunConsumer(document)
+                                    elif existingConsumer.desiredState() == 
Consumer.State.Disabled and self.__isTriggerDocActive(document):
                                         # disabled trigger has become active
                                         logging.info('[{}] Existing disabled 
trigger should become active'.format(change["id"]))
                                         self.createAndRunConsumer(document)
@@ -123,6 +131,11 @@ class Service (Thread):
                             self.lastCanaryTime = datetime.now()
                         else:
                             logging.debug('[changes] Found a change for a 
non-trigger document')
+
+                        # Record the sequence in case the changes feed needs 
to be
+                        # restarted. This way the new feed can pick up right 
where
+                        # the old one left off.
+                        self.lastSequence = change['seq']
             except Exception as e:
                 logging.error('[canary] Exception caught from changes feed. 
Restarting changes feed...')
                 logging.error(e)
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala 
b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 0c05d63..9974bf5 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -16,10 +16,9 @@
  */
 package system.packages
 
-import system.utils.KafkaUtils
-
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
+import system.utils.KafkaUtils
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
@@ -40,6 +39,8 @@ import common.TestUtils.NOT_FOUND
 import org.apache.openwhisk.utils.retry
 import org.apache.openwhisk.core.entity.Annotations
 import java.util.concurrent.ExecutionException
+import common.ActivationResult
+import common.TestUtils.SUCCESS_EXIT
 
 @RunWith(classOf[JUnitRunner])
 class MessageHubFeedTests
@@ -113,6 +114,62 @@ class MessageHubFeedTests
     runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json", 
expectedOutput, false)
   }
 
+  it should "create a trigger, delete that trigger, and quickly create it 
again with successful trigger fires" in withAssetCleaner(wskprops) {
+    val currentTime = s"${System.currentTimeMillis}"
+
+    (wp, assetHelper) =>
+      val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+      val ruleName = s"dummyMessageHub-helloKafka-$currentTime"
+      val parameters = Map(
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
+        "topic" -> topic.toJson
+      )
+
+      val key = "TheKey"
+      val verificationName = s"trigger-$currentTime"
+      val defaultAction = Some("dat/createTriggerActions.js")
+      val defaultActionName = s"helloKafka-$currentTime"
+
+      createTrigger(assetHelper, triggerName, parameters)
+
+      assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) 
=>
+        action.create(name, defaultAction, annotations = 
Map(Annotations.ProvideApiKeyAnnotationName -> JsBoolean(true)))
+      }
+
+      assetHelper.withCleaner(wsk.rule, ruleName) { (rule, name) =>
+        rule.create(name, trigger = triggerName, action = defaultActionName)
+      }
+
+      assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) 
=>
+        trigger.get(name, NOT_FOUND)
+      }
+
+      produceMessage(topic, key, verificationName)
+      retry(wsk.trigger.get(verificationName), 60, Some(1.second))
+
+      wsk.trigger.delete(verificationName, expectedExitCode = SUCCESS_EXIT)
+      wsk.trigger.delete(triggerName, expectedExitCode = SUCCESS_EXIT)
+
+      val feedCreationResult = wsk.trigger.create(triggerName, feed = 
Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters)
+      val activation = 
wsk.parseJsonString(feedCreationResult.stdout.substring(0, 
feedCreationResult.stdout.indexOf("ok: created 
trigger"))).convertTo[ActivationResult]
+      activation.response.success shouldBe true
+
+      wsk.rule.enable(ruleName, expectedExitCode = SUCCESS_EXIT)
+
+      println("Giving the consumer a moment to get ready")
+      Thread.sleep(KafkaUtils.consumerInitTime)
+
+      val uuid = 
activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"",
 "")
+      consumerExists(uuid)
+
+      produceMessage(topic, key, verificationName)
+      retry(wsk.trigger.get(verificationName), 60, Some(1.second))
+  }
+
   it should "fire multiple triggers for two large payloads" in 
withAssetCleaner(wskprops) {
     // payload size should be under the payload limit, but greater than 50% of 
the limit
     val testPayloadSize = 600000

Reply via email to