This is an automated email from the ASF dual-hosted git repository. japetrsn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push: new 566990c Do not skip last sequence if an exception occurs (#364) 566990c is described below commit 566990cfb1c113877b318857036f9151bc4430d1 Author: James Dubee <jwdu...@us.ibm.com> AuthorDate: Mon Jan 6 14:21:43 2020 -0500 Do not skip last sequence if an exception occurs (#364) --- provider/service.py | 25 ++++++--- .../system/packages/MessageHubFeedTests.scala | 61 +++++++++++++++++++++- 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/provider/service.py b/provider/service.py index fa8e109..b8a272f 100644 --- a/provider/service.py +++ b/provider/service.py @@ -68,11 +68,6 @@ class Service (Thread): # check whether or not the feed is capable of detecting canary # documents if change != None: - # Record the sequence in case the changes feed needs to be - # restarted. This way the new feed can pick up right where - # the old one left off. - self.lastSequence = change['seq'] - if "deleted" in change and change["deleted"] == True: logging.info('[changes] Found a delete') consumer = self.consumers.getConsumerForTrigger(change['id']) @@ -109,7 +104,20 @@ class Service (Thread): elif triggerIsAssignedToMe: logging.info('[{}] Found a change to an existing trigger'.format(change["id"])) - if existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document): + if existingConsumer.desiredState() == Consumer.State.Dead and self.__isTriggerDocActive(document): + # if a delete occurs followed quickly by a create the consumer might get stuck in a dead state, + # so we need to forcefully delete the process before recreating it. + logging.info('[{}] A create event occurred for a trigger that is shutting down'.format(change["id"])) + + if existingConsumer.process.is_alive(): + logging.info('[{}] Joining dead process.'.format(existingConsumer.trigger)) + existingConsumer.process.join(1) + else: + logging.info('[{}] Process is already dead.'.format(existingConsumer.trigger)) + + self.consumers.removeConsumerForTrigger(existingConsumer.trigger) + self.createAndRunConsumer(document) + elif existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document): # disabled trigger has become active logging.info('[{}] Existing disabled trigger should become active'.format(change["id"])) self.createAndRunConsumer(document) @@ -123,6 +131,11 @@ class Service (Thread): self.lastCanaryTime = datetime.now() else: logging.debug('[changes] Found a change for a non-trigger document') + + # Record the sequence in case the changes feed needs to be + # restarted. This way the new feed can pick up right where + # the old one left off. + self.lastSequence = change['seq'] except Exception as e: logging.error('[canary] Exception caught from changes feed. Restarting changes feed...') logging.error(e) diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala index 0c05d63..9974bf5 100644 --- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala +++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala @@ -16,10 +16,9 @@ */ package system.packages -import system.utils.KafkaUtils - import scala.concurrent.duration.DurationInt import scala.language.postfixOps +import system.utils.KafkaUtils import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterAll import org.scalatest.FlatSpec @@ -40,6 +39,8 @@ import common.TestUtils.NOT_FOUND import org.apache.openwhisk.utils.retry import org.apache.openwhisk.core.entity.Annotations import java.util.concurrent.ExecutionException +import common.ActivationResult +import common.TestUtils.SUCCESS_EXIT @RunWith(classOf[JUnitRunner]) class MessageHubFeedTests @@ -113,6 +114,62 @@ class MessageHubFeedTests runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json", expectedOutput, false) } + it should "create a trigger, delete that trigger, and quickly create it again with successful trigger fires" in withAssetCleaner(wskprops) { + val currentTime = s"${System.currentTimeMillis}" + + (wp, assetHelper) => + val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" + val ruleName = s"dummyMessageHub-helloKafka-$currentTime" + val parameters = Map( + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> getAsJson("brokers"), + "topic" -> topic.toJson + ) + + val key = "TheKey" + val verificationName = s"trigger-$currentTime" + val defaultAction = Some("dat/createTriggerActions.js") + val defaultActionName = s"helloKafka-$currentTime" + + createTrigger(assetHelper, triggerName, parameters) + + assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) => + action.create(name, defaultAction, annotations = Map(Annotations.ProvideApiKeyAnnotationName -> JsBoolean(true))) + } + + assetHelper.withCleaner(wsk.rule, ruleName) { (rule, name) => + rule.create(name, trigger = triggerName, action = defaultActionName) + } + + assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) => + trigger.get(name, NOT_FOUND) + } + + produceMessage(topic, key, verificationName) + retry(wsk.trigger.get(verificationName), 60, Some(1.second)) + + wsk.trigger.delete(verificationName, expectedExitCode = SUCCESS_EXIT) + wsk.trigger.delete(triggerName, expectedExitCode = SUCCESS_EXIT) + + val feedCreationResult = wsk.trigger.create(triggerName, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters) + val activation = wsk.parseJsonString(feedCreationResult.stdout.substring(0, feedCreationResult.stdout.indexOf("ok: created trigger"))).convertTo[ActivationResult] + activation.response.success shouldBe true + + wsk.rule.enable(ruleName, expectedExitCode = SUCCESS_EXIT) + + println("Giving the consumer a moment to get ready") + Thread.sleep(KafkaUtils.consumerInitTime) + + val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "") + consumerExists(uuid) + + produceMessage(topic, key, verificationName) + retry(wsk.trigger.get(verificationName), 60, Some(1.second)) + } + it should "fire multiple triggers for two large payloads" in withAssetCleaner(wskprops) { // payload size should be under the payload limit, but greater than 50% of the limit val testPayloadSize = 600000