-----Original Message-----
From: Daiter, Sophie
Sent: Wednesday, November 01, 2023 13:03
To: [email protected]
Subject: RE: WELCOME to [email protected]
Hi,
I am using Apache.NMS.ActiveMQ.NetStd nuget version 1.8.0 We implemented
producer and consumer.
Everything is working fine but few times a day we encounter strange behavior
where "producer.send(message)" finishes successfully but the consumer doesn’t
get the message.
Could you help me to figure out why this might happen?
Can the actual queue may be loosing messages? Maybe producer.send() is
swallowing exceptions it has and doesn’t succeed?
Attached producer and consumer we implemented
Please advice
Sophie
-----Original Message-----
From: [email protected] <[email protected]>
Sent: Wednesday, November 01, 2023 12:51
To: Daiter, Sophie <[email protected]>
Subject: WELCOME to [email protected]
Hi! This is the ezmlm program. I'm managing the [email protected]
mailing list.
I'm working for my owner, who can be reached at [email protected].
Acknowledgment: I have added the address
[email protected]
to the users mailing list.
Welcome to [email protected]!
Please save this message so that you know the address you are subscribed under,
in case you later want to unsubscribe or change your subscription address.
--- Administrative commands for the users list ---
I can handle administrative requests automatically. Please do not send them to
the list address! Instead, send your message to the correct command address:
To subscribe to the list, send a message to:
<[email protected]>
To remove your address from the list, send a message to:
<[email protected]>
Send mail to the following for info and FAQ for this list:
<[email protected]>
<[email protected]>
Similar addresses exist for the digest list:
<[email protected]>
<[email protected]>
To get messages 123 through 145 (a maximum of 100 per request), mail:
<[email protected]>
To get an index with subject and author for messages 123-456 , mail:
<[email protected]>
They are always returned as sets of 100, max 2000 per request, so you'll
actually get 100-499.
To receive all messages with the same subject as message 12345, send a short
message to:
<[email protected]>
The messages should contain one line or word of text to avoid being treated as
sp@m, but I will ignore their content.
Only the ADDRESS you send to is important.
You can start a subscription for an alternate address, for example
"[email protected]", just add a hyphen and your address (with '=' instead of
'@') after the command word:
<[email protected]>
To stop subscription for this address, mail:
<[email protected]>
In both cases, I'll send a confirmation message to that address. When you
receive it, simply reply to it to complete your subscription.
If despite following these instructions, you do not get the desired results,
please contact my owner at [email protected]. Please be patient,
my owner is a lot slower than I am ;-)
--- Enclosed is a copy of the request I received.
Return-Path: <[email protected]>
Received: (qmail 651951 invoked by uid 116); 1 Nov 2023 10:50:55 -0000
Received: from spamproc1-he-fi.apache.org (HELO spamproc1-he-fi.apache.org)
(95.217.134.168) by apache.org (qpsmtpd/0.94) with ESMTP; Wed, 01 Nov 2023
10:50:55 +0000
Authentication-Results: apache.org; auth=none
Received: from localhost (localhost [127.0.0.1])
by spamproc1-he-fi.apache.org (ASF Mail Server at
spamproc1-he-fi.apache.org) with ESMTP id 8329AC0CAE
for
<users-sc.1698835750.jcinbcigacfbiajlibhg-sophie.daiter=intel....@activemq.apache.org>;
Wed, 1 Nov 2023 10:50:55 +0000 (UTC)
X-Virus-Scanned: Debian amavisd-new at spamproc1-he-fi.apache.org
X-Spam-Flag: NO
X-Spam-Score: -0.201
X-Spam-Level:
X-Spam-Status: No, score=-0.201 tagged_above=-999 required=6.31
tests=[DKIMWL_WL_HIGH=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1,
DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, SPF_PASS=-0.001,
URIBL_BLOCKED=0.001] autolearn=disabled
Authentication-Results: spamproc1-he-fi.apache.org (amavisd-new);
dkim=pass (2048-bit key) header.d=intel.com
Received: from mx1-ec2-va.apache.org ([116.203.227.195])
by localhost (spamproc1-he-fi.apache.org [95.217.134.168])
(amavisd-new, port 10024)
with ESMTP id 9_ygBo4SCq-W
for
<users-sc.1698835750.jcinbcigacfbiajlibhg-sophie.daiter=intel....@activemq.apache.org>;
Wed, 1 Nov 2023 10:50:53 +0000 (UTC)
Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=192.55.52.93;
helo=mgamail.intel.com; [email protected];
receiver=<UNKNOWN>
Received: from mgamail.intel.com (mgamail.intel.com [192.55.52.93])
by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org)
with ESMTPS id 4DCD7BEE5D
for
<users-sc.1698835750.jcinbcigacfbiajlibhg-sophie.daiter=intel....@activemq.apache.org>;
Wed, 1 Nov 2023 10:50:51 +0000 (UTC)
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple;
d=intel.com; [email protected]; q=dns/txt; s=Intel;
t=1698835852; x=1730371852;
h=from:to:subject:date:message-id:references:in-reply-to:
mime-version:content-transfer-encoding;
bh=vo8m+2O2mKgicetWUN0HkoYjmQHSZmS+02zQy6SD4PI=;
b=CIzGG0U6+4I9GFmjxFKoclr5G5maOZf7cgMIHnxb3xa4huNuntDSZ1co
uHKK9Z9v25WuWHMkR4Gghmtu2A+1LDR3pkA4+WUwW5RGfzoZjJcx17H93
inGimBa0tY7qeFLaz5mbKIcz98bWbT4oi6sQbs229V9LjOJc6ftWJ4xhF
YwEqL3NEyYjNeBZkcuPVSB6TMMWuyCH06euWmuqFWxtA2Un9xRHt1WuGu
570eZzoigq6iM/qfmP8cwh6SVULSOKJHIa764ntp72LAxHuhIYoVEu3l7
e4IPCjyWiF8IwTe7TU+Tgo+HN12Cg+cGi3gyvXfTSck3SwPw0QILpB5vY
Q==;
X-IronPort-AV: E=McAfee;i="6600,9927,10880"; a="385650084"
X-IronPort-AV: E=Sophos;i="6.03,268,1694761200";
d="scan'208";a="385650084"
Received: from orsmga004.jf.intel.com ([10.7.209.38])
by fmsmga102.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 01 Nov
2023 03:50:44 -0700
X-ExtLoop1: 1
X-IronPort-AV: E=McAfee;i="6600,9927,10880"; a="884528894"
X-IronPort-AV: E=Sophos;i="6.03,268,1694761200";
d="scan'208";a="884528894"
Received: from orsmsx603.amr.corp.intel.com ([10.22.229.16])
by orsmga004.jf.intel.com with ESMTP/TLS/AES256-GCM-SHA384; 01 Nov 2023
03:50:44 -0700
Received: from orsmsx603.amr.corp.intel.com (10.22.229.16) by
ORSMSX603.amr.corp.intel.com (10.22.229.16) with Microsoft SMTP Server
(version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256) id
15.1.2507.34; Wed, 1 Nov 2023 03:50:43 -0700
Received: from ORSEDG601.ED.cps.intel.com (10.7.248.6) by
orsmsx603.amr.corp.intel.com (10.22.229.16) with Microsoft SMTP Server
(version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256) id
15.1.2507.34 via Frontend Transport; Wed, 1 Nov 2023 03:50:43 -0700
Received: from NAM10-BN7-obe.outbound.protection.outlook.com (104.47.70.101)
by edgegateway.intel.com (134.134.137.102) with Microsoft SMTP Server
(version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id
15.1.2507.34; Wed, 1 Nov 2023 03:50:43 -0700
ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none;
b=nzPk3QVZEuTKyrA2kaYKdJVaxEUr8t6y+MIvPjp8flvzhWPHYWeItdIEkB+DXY56Kzlk7ygTApalIfdG8XbAF3P/Cb5J6pMpu1WyWF6Ga9bjbjufAW5AAXaw2vkcp5/I4lWXaqKuOAoIUQkldeCbLpKrLhP7DbNzq3Zso5g1fWRaxFx39WK0FAxfjr67vKIKADEv0rn1/SAPgFI1JGgZ0xe6UEyTAC7iPUN564OQPlK3nedRZuep/KPxuMgF6qu1wQ9bdL+Q1GiIsm9g2m8YIetLWA7cn7jR3/G9FhevY+ilCZYY9b5OKvRhHQ/WtdpEJtpXBX737oRgowal391g0A==
ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com;
s=arcselector9901;
h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-AntiSpam-MessageData-ChunkCount:X-MS-Exchange-AntiSpam-MessageData-0:X-MS-Exchange-AntiSpam-MessageData-1;
bh=qq+nCUaXnzoSSZJui/l9uazCgZswOKZgWvZd+bCcneU=;
b=lW9j3kjObHpAY0nM/R4qiuUbjxZrJWOdfXOG6lKSBPT0pEPLNXsd1iwEf5XB/mgWDaK/ZAGIKpzQXVNdXjfVB60jqi4G3ZvsTnpDlDJfMdi5fOjkmwPMHqg9VuO+veonKA/bEyfceoI9uo7FgDHPthD4Adekqu41EjEI6eRcaSOUvYpBBMMcmpxJrWcvLiW209SDUMiHOu6etvyJzarzhyKhURjjvOtupA2JtOE+S8kuTd/mO5ju7W/DSywvm4HUB3hecagoCxQNq6/LrOpZE8h0NrjHSMQM3gBo9eJZT0457zKrsbiRxE0p2cPYZ+BSyErcviUxnarf/jbTUFGwYA==
ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass
smtp.mailfrom=intel.com; dmarc=pass action=none header.from=intel.com;
dkim=pass header.d=intel.com; arc=none
Received: from DM4PR11MB8180.namprd11.prod.outlook.com (2603:10b6:8:18d::17)
by CY5PR11MB6511.namprd11.prod.outlook.com (2603:10b6:930:41::7) with
Microsoft SMTP Server (version=TLS1_2,
cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.6933.29; Wed, 1 Nov
2023 10:50:41 +0000
Received: from DM4PR11MB8180.namprd11.prod.outlook.com
([fe80::600:1cfe:696f:5d14]) by DM4PR11MB8180.namprd11.prod.outlook.com
([fe80::600:1cfe:696f:5d14%5]) with mapi id 15.20.6933.029; Wed, 1 Nov 2023
10:50:41 +0000
From: "Daiter, Sophie" <[email protected]>
To:
"users-sc.1698835750.jcinbcigacfbiajlibhg-sophie.daiter=intel....@activemq.apache.org"
<users-sc.1698835750.jcinbcigacfbiajlibhg-sophie.daiter=intel....@activemq.apache.org>
Subject: RE: confirm subscribe to [email protected]
Thread-Topic: confirm subscribe to [email protected]
Thread-Index: AQHaDLER9RKvdXCE+U+WU2P8G3ZkvrBlSPeA
Date: Wed, 1 Nov 2023 10:50:41 +0000
Message-ID:
<dm4pr11mb8180ca54142710aefeed6d8ee6...@dm4pr11mb8180.namprd11.prod.outlook.com>
References: <[email protected]>
In-Reply-To: <[email protected]>
Accept-Language: en-US
Content-Language: en-US
X-MS-Has-Attach:
X-MS-TNEF-Correlator:
authentication-results: dkim=none (message not signed)
header.d=none;dmarc=none action=none header.from=intel.com;
x-ms-publictraffictype: Email
x-ms-traffictypediagnostic: DM4PR11MB8180:EE_|CY5PR11MB6511:EE_
x-ms-office365-filtering-correlation-id: 22dca17c-3d33-422c-b6fd-08dbdac8641c
x-ms-exchange-senderadcheck: 1
x-ms-exchange-antispam-relay: 0
x-microsoft-antispam: BCL:0;
x-microsoft-antispam-message-info:
bkZw5Y0YU3LD0+py4tz+Bq00QLHJJPVVPLhXPZVTQZUpGvM/Kciy7RsaURAAAmm/GkC+jwbVvJGMkFdgE950uUpcSQ0yS2UAtAlyPmvd4ntL4SD3a+VE0f75d2aogxNIaDv8bExKV2hmmwW6Ot5IncgjOTBQyd7M7S8C36dzYVJJqS9rbnRkonZjCcV6blZtVl6A4Funz7GB5kPp3iEgV7xt+b/yhAQrPymfxcCCc/CZzazp8qBT3cdZOg5r1JgsEf/+KS4YRPN+KjcAukOVLpeVOKRbGyg9HPXocnFnGs3l2CkEkUnNPo9iuPdH/qRppEAxm3E2lD9QAEDPRgiBlkm8CmZuLZ/LLATa24cYyI2g0S+yVnZwtp1WfP3Pe2dma4hl2YEsmp1bG3kJPm9uoy7GWJpyIudLphDio3EJJiF+0zKNb83ilWxGtiVWC+LuerrkW9/X6Xgk9gP9fVuMpsd1pUYxMdxs2TL228zV9BH0fnLAFbkp2TW5gLvabz2hhV8WMOMp+BoGxMaxidCV2VSfA/ZLp6Lrts4/32V3mUKsRta7La6rk/HQ5BtLAmQYZju16GmdiPumNgz5Jmm6nccRnG13a4nRG6yKNQqzqSM=
x-forefront-antispam-report:
CIP:255.255.255.255;CTRY:;LANG:en;SCL:1;SRV:;IPV:NLI;SFV:NSPM;H:DM4PR11MB8180.namprd11.prod.outlook.com;PTR:;CAT:NONE;SFS:(13230031)(396003)(366004)(136003)(39860400002)(346002)(376002)(230922051799003)(186009)(451199024)(1800799009)(64100799003)(55016003)(26005)(9686003)(45080400002)(53546011)(52536014)(6506007)(478600001)(71200400001)(2906002)(66574015)(83380400001)(7696005)(30864003)(316002)(5660300002)(66556008)(41300700001)(76116006)(66946007)(19627235002)(66476007)(8676002)(66446008)(966005)(64756008)(8936002)(33656002)(38100700002)(122000001)(82960400001)(86362001)(38070700009)(66899024);DIR:OUT;SFP:1102;
x-ms-exchange-antispam-messagedata-chunkcount: 1
x-ms-exchange-antispam-messagedata-0:
=?utf-8?B?Rmp1dU1YalYyZ3UxRFF1b1NjYVVkSjFHRDdvYUhENUxsSTJ5RDRCSHBDR3Jp?=
=?utf-8?B?MUxBbVJpQVg0Z2R5SGtVMWNHTGlRWjFWSDI3QjVKOFhFNmZFWHdGbm5yenVH?=
=?utf-8?B?eDI4Ylo4RUhJcHQxWStUdSs4cHc5YzM5WGpoeFFLd2djRWhrSWZTVGMyU1pu?=
=?utf-8?B?L1hPMDVFdDE4MkZ4Sno3akNQWTJRTjJKeC9OUEdKRkxzWWp3ZlJuNTE2TXlw?=
=?utf-8?B?Y252bFpLbTVLd2ZYZDZFMGYzTFRicmJXa050TS95ZDUxdnFIckdxald2VUIz?=
=?utf-8?B?M2NUZHA1RzMzNlFBN3BKMCs5ZExWQXhDcWREYU9JOEV2eUJKcGZHVHRZYm53?=
=?utf-8?B?OFdmYU1lTm1DRW8rMllaRUk3eDdlWjQrWUNnbjNSN09sOEQvbE9hYklMVG1i?=
=?utf-8?B?NCtGWFBSaFQyZjQ5czRhbkFkajEwS3ZGcUNBbElSd0tMbzMyb1pIZWdER0Jr?=
=?utf-8?B?b3VTUi9oVGEreGRtWFYxUmJlNU5weHR1NVhnZjFwSlh6MzRhZlYyMUl2R3NC?=
=?utf-8?B?L3dMSUZEWlE5U1Z4TDhtRk1RQTVzWEpVLytidHE0OC9kbWY2TTVZeDFJbkRv?=
=?utf-8?B?aTNpRmV1ckllN002NlZGeVNEUVMvTDJJTndzVzNybVlkMEpEdXRUTkxUcitV?=
=?utf-8?B?enAyMktyNTNaOXJYNW1nUEZWdU96bVErM0lIUnQra3lTamdyVml0LzJTWjRL?=
=?utf-8?B?bUNVQXkzNXl2UmhBc0lyNzdESTRlYU1KV2lMbDBXNlIza2Y2enNXcEl1ZUhZ?=
=?utf-8?B?bEZ1RlZ6bmZBa25JbHcvZ3ZYWGxuR3pRLzd5c3FScmtHS2luTldLeEg2Yy9a?=
=?utf-8?B?NFBEcHFCb2FmNnNVRDVxMko1d0h6THpMZml1UzZlUDM4WWxxS3gvYVFzajhH?=
=?utf-8?B?eDBjUW1XMTdwN1N4VmpyQ2Z6T3lUY2lRS3pxcEpxOTBWRnY3cHBGb0Q3NnNY?=
=?utf-8?B?UkJUTEk3eS9vT21YSG9GdVowTFN2VThUN3BoSjJGSXNpZSs0TTdwUzg0MUpt?=
=?utf-8?B?WlNlcEZta0tqVTRIczVKQ1FtN2hDSHBtbkZCSFYrR2paYnF1NWh4akdTdU5r?=
=?utf-8?B?emNpd2pwakxoY3BJTUtIaVk0YkxEYkV5YjQzYmJlL1VTVi8zVjdkR1ZLK010?=
=?utf-8?B?V3pKQzM2aEswTUtDU0Q1L203TVpPeXdZUTNpYlV3ZnVHdVM2Q0FWb0h1UDRh?=
=?utf-8?B?djVEQzBoNy9NanJVYXBibWlWdnprZHNBRWh4L2IwOU5KdmZ4UGNSZThWdVN2?=
=?utf-8?B?cERpUDZvQlRwSGVEaTluMERlT2NRR0VvQW1pTnJmU1MrakFCekc3NktXQXBa?=
=?utf-8?B?UXNZOUgzeWhJUjJjSFljcGorSjVhMy8yUnYxemRZaWdrblVCekpaZEFSMEQr?=
=?utf-8?B?d21nT295ejJSQWNQWW9aS3pXOG54dCtET0NEYVdtUEdUK1l1WTZrZzBhblZk?=
=?utf-8?B?TzNrMEpjakhyd1JCNEQ2eUtudDFTZTJINi9obU5sWDBSbDh4VURBRDR1VXN2?=
=?utf-8?B?Q2pHelZyenV5djlaWnk0Zmd0UEduV0x4VU4zK0UzazErUGFyTlhmNmtjSndN?=
=?utf-8?B?MHNqbXZjY3RsTTgvL1dzVElDT3BJTVlwZnhpdDJQTzRBWklFcXFlazRaeWk3?=
=?utf-8?B?Vmh0Yk5sdk1YaWtHcW4yTUtEK1ZmZWxCSHVXWUF1VDA3cjl3SS9wUVNrNk9B?=
=?utf-8?B?anRZNXdKS0RGUHFUcldLUURIYXN6bDF2elRTUGdoUm80Rm43bHc0aUlFWkhH?=
=?utf-8?B?SmJ5RXNKUEtlREtTeXJUbzg3K0lJTjlqMTI4ZkYrRXNQdCtMVVlMODVlRHVj?=
=?utf-8?B?SkowU2V5a2Y3WlZEakFFU0ZtYUFJbU0wRk0xbGRlQXVsYUdtd0g1RjBHMDR6?=
=?utf-8?B?TWJvNXhSSXlQblM3V2FGTUtEZmREODd2S3ZPNHFPS09JSi8wTG9nOTA5TnRk?=
=?utf-8?B?TitkWjhzSHlBTll3dDdyVU5hVzNwWnNyb3VBUW1xM21odzFsbXBzU2FKOW1T?=
=?utf-8?B?dkx4NGhPZENjWTlYdlF4aGFGdHQ0MW1SL296WDhKWHVxZ2lwT2FzeTNndVk1?=
=?utf-8?B?SWh4MGpLa1J0VlhmTmpYdUY5Q0dwUmpEekpyb1AyRG1sQjBLdGo4Y0RpaVpz?=
=?utf-8?Q?5EjcC6LijXri3hdgtRTQF5jbG?=
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
X-MS-Exchange-CrossTenant-AuthAs: Internal
X-MS-Exchange-CrossTenant-AuthSource: DM4PR11MB8180.namprd11.prod.outlook.com
X-MS-Exchange-CrossTenant-Network-Message-Id:
22dca17c-3d33-422c-b6fd-08dbdac8641c
X-MS-Exchange-CrossTenant-originalarrivaltime: 01 Nov 2023 10:50:41.0850
(UTC)
X-MS-Exchange-CrossTenant-fromentityheader: Hosted
X-MS-Exchange-CrossTenant-id: 46c98d88-e344-4ed4-8496-4ed7712e255d
X-MS-Exchange-CrossTenant-mailboxtype: HOSTED
X-MS-Exchange-CrossTenant-userprincipalname:
pp3lhzkNI27jWif2D3GmGIRahJzwyh9L6vTJnRjAfn12lxoinuSxNZ0pBlA+V4N368uC+jIjyVtJoH0AG/59UQ==
X-MS-Exchange-Transport-CrossTenantHeadersStamped: CY5PR11MB6511
X-OriginatorOrg: intel.com
Content-Transfer-Encoding: base64
---------------------------------------------------------------------
Intel Israel (74) Limited
This e-mail and any attachments may contain confidential material for
the sole use of the intended recipient(s). Any review or distribution
by others is strictly prohibited. If you are not the intended
recipient, please contact the sender and delete all copies.
using System;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using CSharpFunctionalExtensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using OrchestratorService.Configuration;
using OrchestratorService.Logging;
using IAmqMessage = Apache.NMS.IMessage;
namespace OrchestratorService.Queue.ActiveMq
{
public class ActiveMqConsumer<T> : IQueueConsumer
where T : IMessage
{
private readonly ISparkLogger<ActiveMqConsumer<T>> _logger;
private readonly IOptions<QueueOptions> _queueOptions;
private Apache.NMS.IMessageConsumer _consumer;
private Func<string, Task> _onMessage;
private IConnection _connection;
private ISession _session;
public ActiveMqConsumer(ISparkLogger<ActiveMqConsumer<T>> logger,
IOptions<QueueOptions> queueOptions)
{
_logger = logger;
_queueOptions = queueOptions;
}
public Result StartConsuming(Func<string, Task> onMessage)
{
var config = _queueOptions.Value;
_onMessage = onMessage;
try
{
var factory = new ConnectionFactory(config.BrokerUri);
_connection = factory.CreateConnection(config.Username,
config.Password);
_connection.Start();
// todo: use transactional ack to handle failures in execution
// _session =
_connection.CreateSession(AcknowledgementMode.Transactional); // with client
ack for a specific message:
https://activemq.apache.org/components/nms/msdoc/1.6.0/vs2005/Output/html/T_Apache_NMS_AcknowledgementMode.htm
_session = _connection.CreateSession();
var destination = _session.GetQueue(config.QueueName);
var messageType = typeof(T).Name;
var selector = $"messageType = '{messageType}'";
_consumer = _session.CreateConsumer(destination, selector);
_logger.LogInformation("Start consuming messages....");
_consumer.Listener += OnTextMessage;
_connection.ExceptionListener += ConnectionOnExceptionListener;
_connection.ConnectionInterruptedListener +=
ConnectionOnConnectionInterruptedListener;
_connection.ConnectionResumedListener +=
ConnectionOnConnectionResumedListener;
}
catch (Exception e)
{
return Result.Fail($"Cannot start handling messages, error:
{e}");
}
return Result.Ok();
}
public Result StopConsuming()
{
try
{
_consumer.Listener -= OnTextMessage;
_onMessage = null;
_consumer.Dispose();
_session.Dispose();
_connection.ExceptionListener -= ConnectionOnExceptionListener;
_connection.ConnectionInterruptedListener -=
ConnectionOnConnectionInterruptedListener;
_connection.ConnectionResumedListener -=
ConnectionOnConnectionResumedListener;
_connection.Stop();
_connection.Dispose();
}
catch (Exception e)
{
return Result.Fail($"Cannot stop handling messages, error:
{e}");
}
_logger.LogInformation("Stopped consuming messages....");
return Result.Ok();
}
// see:
https://docs.microsoft.com/en-us/archive/msdn-magazine/2013/march/async-await-best-practices-in-asynchronous-programming
// for explanation on async void here
private async void OnTextMessage(IAmqMessage message)
{
if (!(message is ITextMessage textMessage))
{
_logger.LogCritical($"received message {message.NMSMessageId}
is not a text message");
return;
}
_logger.LogInformation($"Received message: {textMessage.Text}");
if (_onMessage == null)
{
_logger.LogCritical($"cannot handle message
{message.NMSMessageId}, onMessage is null");
}
else
{
await _onMessage(textMessage.Text);
}
}
// Active MQ has automatic failover when the connection is interrupted
// the connection is recreated & resumed automatically
private void ConnectionOnConnectionResumedListener()
{
_logger.LogInformation("ActiveMQ consumer connection resumed");
}
private void ConnectionOnConnectionInterruptedListener()
{
_logger.LogInformation("ActiveMQ consumer connection interrupted");
}
private void ConnectionOnExceptionListener(Exception exception)
{
_logger.LogError($"ActiveMQ consumer connection caught exception:
${exception}");
}
}
}
using System;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using CSharpFunctionalExtensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using OrchestratorService.Configuration;
using OrchestratorService.Logging;
namespace OrchestratorService.Queue.ActiveMq
{
public class ActiveMqProducer : IQueueProducer, IDisposable
{
private readonly ISparkLogger<ActiveMqProducer> _logger;
private readonly IConnection _connection;
private readonly ISession _session;
private readonly Apache.NMS.IMessageProducer _producer;
public ActiveMqProducer(IOptions<QueueOptions> queueOptions,
ISparkLogger<ActiveMqProducer> logger)
{
_logger = logger;
var config = queueOptions.Value;
var factory = new ConnectionFactory(config.BrokerUri);
_connection = factory.CreateConnection(config.Username,
config.Password);
_connection.Start();
_session = _connection.CreateSession();
var destination = _session.GetQueue(config.QueueName);
_producer = _session.CreateProducer(destination);
_connection.ExceptionListener += ConnectionOnExceptionListener;
_connection.ConnectionInterruptedListener +=
ConnectionOnConnectionInterruptedListener;
_connection.ConnectionResumedListener +=
ConnectionOnConnectionResumedListener;
}
public void Dispose()
{
try
{
_producer.Dispose();
_session.Dispose();
_connection.ExceptionListener -= ConnectionOnExceptionListener;
_connection.ConnectionInterruptedListener -=
ConnectionOnConnectionInterruptedListener;
_connection.ConnectionResumedListener -=
ConnectionOnConnectionResumedListener;
_connection.Stop();
_connection.Dispose();
}
catch (Exception e)
{
_logger.LogCritical($"failed to dispose producer, error: {e}");
}
_logger.LogInformation("producer disposed successfully");
}
public Result QueueMessage<T>(string message)
{
var messageToSend = _session.CreateTextMessage(message);
// create a property with the message type that can be used as a
selector by the consumer.
// this can be used later on for messages that are of other types
than WorkItemMessage
var messageType = typeof(T).Name;
messageToSend.Properties.SetString("messageType", messageType);
_logger.LogInformation($"Sending message to queue: {message}...");
try
{
_producer.Send(messageToSend);
}
catch (Exception e)
{
var error = $"failed to add message to queue, \nerror: ${e}
\nmessage: {message}";
return Result.Fail(error);
}
_logger.LogInformation($"message was added successfully to queue,
message: {message}");
return Result.Ok();
}
// Active MQ has automatic failover when the connection is interrupted
// the connection is recreated & resumed automatically
private void ConnectionOnConnectionResumedListener()
{
_logger.LogInformation("ActiveMQ producer connection resumed");
}
private void ConnectionOnConnectionInterruptedListener()
{
_logger.LogInformation("ActiveMQ producer connection interrupted");
}
private void ConnectionOnExceptionListener(Exception exception)
{
_logger.LogError($"ActiveMQ producer connection caught exception:
${exception}");
}
}
}