Sorry, tied up with a couple of things here. Literally just opened the IDE
to finish this off. :)

Jon

On Wed, Jun 16, 2021 at 9:46 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

> hi Jon,
>
> Is there anything else you need from my end?  It feels like it is nearly
> finished.  Could it pass the parameters in a similar way to how it is
> done in the chatterbox-imap?  sending them in from the tomee.xml?
>
>
> On 10/06/2021 19:07, Jonathan Gallimore wrote:
> > Thank you! That worked. I have pushed an update to my code, and I've been
> > able to send a message to NATS from a REST endpoint, and receive a
> message
> > from NATS via an MDB.
> >
> > I still need to extract the cluster ID and client ID into properties for
> > the resource adapter, and of course, try and write up how this works.
> Code
> > is here: https://github.com/jgallimore/tomee-chatterbox/tree/nats, but
> I'll
> > merge it in once I have done these couple of changes.
> >
> > One other thing on my mind is that I'd like to try and find some way to
> > make all this easier. If you've used JMS, you've used JCA, possibly
> without
> > realizing it, but the spec still feels very hard to get into - I wonder
> if
> > there is anything we can propose in that regard to try and make creating
> > simple connectors a bit easier.
> >
> > Jon
> >
> > On Wed, Jun 9, 2021 at 4:31 PM Matthew Broadhead
> > <matthew.broadh...@nbmlaw.co.uk.invalid> wrote:
> >
> >> hi, it was my fault putting a confusing value in the docker compose
> >> file.  it should work like this
> >>
> >> StreamingConnectionFactory cf = new
> >>    StreamingConnectionFactory(new
> >> Options.Builder().natsUrl("nats://localhost:4222")
> >>    .clusterId("yourclientid").clientId("anything").build());
> >>
> >> but you could change the docker-compose.yml to have a -cid of
> >> yourclusterid and then do this
> >>
> >> StreamingConnectionFactory cf = new
> >>    StreamingConnectionFactory(new
> >> Options.Builder().natsUrl("nats://localhost:4222")
> >>    .clusterId("yourclusterid").clientId("yourclientid").build());
> >>
> >> On 09/06/2021 17:15, Jonathan Gallimore wrote:
> >>> Thanks. If I can get that test going, I can probably get the rest
> >> working.
> >>> I suspect there are some other bugs in there.
> >>>
> >>> Jon
> >>>
> >>> On Wed, Jun 9, 2021 at 4:04 PM Matthew Broadhead
> >>> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote:
> >>>
> >>>> Thanks I will have a look now
> >>>>
> >>>> On 09/06/2021 15:19, Jonathan Gallimore wrote:
> >>>>> Ok, I'm stuck. If I boot up a NATS server with your
> docker-compose.yml
> >>>>> file, and run the following test:
> >>>>>
> >>>>>        @Test
> >>>>>        public void testShouldConnect() throws Exception {
> >>>>>            StreamingConnectionFactory cf = new
> >>>>>                    StreamingConnectionFactory(new
> >>>>> Options.Builder().natsUrl("nats://localhost:4222")
> >>>>>
> >>>> .clusterId("cluster-id").clientId("yourclientid").build());
> >>>>>            final StreamingConnection connection =
> cf.createConnection();
> >>>>>            Assert.assertNotNull(connection);
> >>>>>
> >>>>>            connection.close();
> >>>>>        }
> >>>>>
> >>>>> It fails with a timeout.
> >>>>>
> >>>>> I monitored the connection with wireshark, and see the following
> >>>>>
> >>>>> < = from server to client
> >>>>>> = from client to server
> >>>>> <INFO
> >>>>>
> >>
> <{"server_id":"NDMRYDSGUSH2QR6SZWMFB44ND5CODXGKNYTQ5IPLUGYUDBI6G54CIGF6","server_name":"NDMRYDSGUSH2QR6SZWMFB44ND5CODXGKNYTQ5IPLUGYUDBI6G54CIGF6","version":"2.1.4","proto":1,"git_commit":"fb009af","go":"go1.13.7","host":"0.0.0.0","port":4222,"max_payload":1048576,"client_id":10}
> >>>>>> CONNECT
> >>
> {"lang":"java","version":"2.6.5","name":"yourclientid","protocol":1,"verbose":false,"pedantic":false,"tls_required":false,"echo":true}
> >>>>>> PING
> >>>>> <PONG
> >>>>>> SUB _STAN.acks.LP4bdY88abuVJ19Qo5HVuk 1
> >>>>>> SUB _INBOX.LP4bdY88abuVJ19Qo5HVn8 2
> >>>>>> SUB _INBOX.LP4bdY88abuVJ19Qo5HVqw 3
> >>>>>> SUB _INBOX.F0vVy1N0sQM3xseeEWMIAL.* 4
> >>>>>> PUB _STAN.discover.cluster-id
> >>>>> _INBOX.F0vVy1N0sQM3xseeEWMIAL.F0vVy1N0sQM3xseeEWMISH 75
> >>
> .yourclientid.._INBOX.LP4bdY88abuVJ19Qo5HVn8..".LP4bdY88abuVJ19Qo5HVjK(.0.
> >>>>> <PING
> >>>>>> PONG
> >>>>>> UNSUB 1
> >>>>>> UNSUB 2
> >>>>>> UNSUB 3
> >>>>> So there does appear to be some communication between my test and the
> >>>> NATS
> >>>>> server - I have no idea why it times out.
> >>>>>
> >>>>> My code is here if you want to have a go:
> >>>>> https://github.com/jgallimore/tomee-chatterbox/tree/nats
> >>>>>
> >>>>> Jon
> >>>>>
> >>>>> On Wed, Jun 9, 2021 at 11:56 AM Jonathan Gallimore <
> >>>>> jonathan.gallim...@gmail.com> wrote:
> >>>>>
> >>>>>> Nervermind, I figured out my mistake. I'll post back when I have
> >>>> something
> >>>>>> going.
> >>>>>>
> >>>>>> Jon
> >>>>>>
> >>>>>> On Wed, Jun 9, 2021 at 11:44 AM Jonathan Gallimore <
> >>>>>> jonathan.gallim...@gmail.com> wrote:
> >>>>>>
> >>>>>>> I think I have something wired up, but when executing this:
> >>>>>>>
> >>>>>>>                cf = new
> >>>>>>>                        StreamingConnectionFactory(new
> >>>>>>> Options.Builder().natsUrl(baseAddress)
> >>>>>>>
> >>>>>>> .clusterId("cluster-id").clientId("client-id").build());
> >>>>>>>
> >>>>>>>                connection = cf.createConnection();
> >>>>>>>
> >>>>>>>
> >>>>>>> connection is null. Any pointers?
> >>>>>>>
> >>>>>>> Jon
> >>>>>>>
> >>>>>>> On Wed, Jun 9, 2021 at 8:16 AM Matthew Broadhead
> >>>>>>> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote:
> >>>>>>>
> >>>>>>>> i have never used a JCA adapter before.  is it loaded in using the
> >>>>>>>> tomee.xml as a Resource?  and then injected into a singleton for
> >>>>>>>> subscribing to messages?
> >>>>>>>>
> >>>>>>>> On 08/06/2021 17:15, Jonathan Gallimore wrote:
> >>>>>>>>> Definitely sounds like a good case for a JCA adapter. I'll take a
> >>>> quick
> >>>>>>>>> swing at hooking up an example for you.
> >>>>>>>>>
> >>>>>>>>> Jon
> >>>>>>>>>
> >>>>>>>>> On Tue, Jun 8, 2021 at 9:02 AM Matthew Broadhead
> >>>>>>>>> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Jon,
> >>>>>>>>>>
> >>>>>>>>>> NATS is basically a message queue, like ActiveMQ I suppose.
> >>>>>>>>>>
> >>>>>>>>>> I included the adapter into the project using maven
> >>>>>>>>>> <dependency>
> >>>>>>>>>>           <groupId>io.nats</groupId>
> >>>>>>>>>>           <artifactId>java-nats-streaming</artifactId>
> >>>>>>>>>>           <version>2.2.3</version>
> >>>>>>>>>> </dependency>
> >>>>>>>>>>
> >>>>>>>>>> i started up a nats server using docker.  here is my
> >>>>>>>> docker-compose.yml
> >>>>>>>>>> version: '3.1'
> >>>>>>>>>> services:
> >>>>>>>>>>        nats-docker:
> >>>>>>>>>>          image: nats-streaming:0.17.0
> >>>>>>>>>>          restart: always
> >>>>>>>>>>          command:
> >>>>>>>>>>            - '-p'
> >>>>>>>>>>            - '4222'
> >>>>>>>>>>            - '-m'
> >>>>>>>>>>            - '8222'
> >>>>>>>>>>            - '-hbi'
> >>>>>>>>>>            - '5s'
> >>>>>>>>>>            - '-hbt'
> >>>>>>>>>>            - '5s'
> >>>>>>>>>>            - '-hbf'
> >>>>>>>>>>            - '2'
> >>>>>>>>>>            - '-SD'
> >>>>>>>>>>            - '-cid'
> >>>>>>>>>>            - 'yourclientid'
> >>>>>>>>>>          environment:
> >>>>>>>>>>            TZ: Europe/London
> >>>>>>>>>>            LANG: en_GB.UTF-8
> >>>>>>>>>>            LANGUAGE: en_GB:en
> >>>>>>>>>>            LC_ALL: en_GB.UTF-8
> >>>>>>>>>>          ports:
> >>>>>>>>>>            - '4222:4222'
> >>>>>>>>>>            - '8222:8222'
> >>>>>>>>>>          expose:
> >>>>>>>>>>            - 4222
> >>>>>>>>>>            - 8222
> >>>>>>>>>>          networks:
> >>>>>>>>>>            - backend
> >>>>>>>>>> networks:
> >>>>>>>>>>        backend:
> >>>>>>>>>>          driver: bridge
> >>>>>>>>>>
> >>>>>>>>>> JCA sounds good if it solves the threading issue.  it is very
> kind
> >>>> of
> >>>>>>>>>> you to offer to help write an adapter.  looking at the code you
> >> sent
> >>>>>>>> it
> >>>>>>>>>> looks complicated but i can have a stab at it if you don't have
> >> much
> >>>>>>>> time
> >>>>>>>>>> let me know if you need more info
> >>>>>>>>>>
> >>>>>>>>>> Matthew
> >>>>>>>>>>
> >>>>>>>>>> On 07/06/2021 17:48, Jonathan Gallimore wrote:
> >>>>>>>>>>> At the risk of sounding a bit ignorant... what is NATS?
> >>>>>>>>>>>
> >>>>>>>>>>>      From what I can tell, it sounds like you're receiving a
> >> stream
> >>>> of
> >>>>>>>> events
> >>>>>>>>>>> (over websocket) and want to do some processing in an EJB or
> CDI
> >>>>>>>> bean for
> >>>>>>>>>>> each event. The connection to the NATS server isn't in the
> >> context
> >>>>>>>> of a
> >>>>>>>>>>> HTTP (or any other type of) request, and just runs all the time
> >>>>>>>> while the
> >>>>>>>>>>> server is running - does that sound about right?
> >>>>>>>>>>>
> >>>>>>>>>>> Assuming that sounds right, it sounds a bit like the Slack JCA
> >>>>>>>> connector
> >>>>>>>>>> I
> >>>>>>>>>>> wrote a while back:
> >>>>>>>>>>>
> >>>>
> https://github.com/apache/tomee-chatterbox/tree/master/chatterbox-slack
> >> .
> >>>>>>>>>>> Essentially, the resource adapter connects to slack and runs
> all
> >>>> the
> >>>>>>>>>> time.
> >>>>>>>>>>> Messages that come into the server from slack are processed in
> >> MDBs
> >>>>>>>> that
> >>>>>>>>>>> implement the InboundListener interface.
> >>>>>>>>>>>
> >>>>>>>>>>> JCA certainly feels complex, especially when compared with your
> >>>>>>>>>>> Singleton @Startup bean approach, but I usually find that if I
> >> try
> >>>>>>>> and
> >>>>>>>>>> work
> >>>>>>>>>>> with threads in EJBs, things usually go in the wrong direction.
> >>>>>>>>>> Conversely,
> >>>>>>>>>>> JCA even gives you a work manager to potentially handle that
> >> stuff.
> >>>>>>>>>>> If you can give me some pointers to running a NATS server, I'd
> be
> >>>>>>>> happy
> >>>>>>>>>> to
> >>>>>>>>>>> help with a sample adapter and application.
> >>>>>>>>>>>
> >>>>>>>>>>> Jon
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Jun 7, 2021 at 11:49 AM Matthew Broadhead
> >>>>>>>>>>> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> I am trying to subscribe to a NATS streaming server with
> >>>>>>>>>>>> https://github.com/nats-io/stan.java which is
> >>>>>>>> java.lang.Autocloseable.
> >>>>>>>>>>>> At first it wasn't closing properly as seen in my original
> gist:
> >>>>>>>>>>>>
> >> https://gist.github.com/chongma/2a3ab451f2aeabc98340a9b897394cfe
> >>>>>>>>>>>> This was solved with this
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>
> https://stackoverflow.com/questions/39080296/hazelcast-threads-prevent-tomee-from-stopping
> >>>>>>>>>>>> creating a default producer:
> >>>>>>>>>>>> @ApplicationScoped
> >>>>>>>>>>>> public class NatsConnectionProducer {
> >>>>>>>>>>>>
> >>>>>>>>>>>>           @Resource(name = "baseAddressNats")
> >>>>>>>>>>>>           private String baseAddressNats;
> >>>>>>>>>>>>
> >>>>>>>>>>>>           @Produces
> >>>>>>>>>>>>           @ApplicationScoped
> >>>>>>>>>>>>           public StreamingConnection instance() throws
> >> IOException,
> >>>>>>>>>>>> InterruptedException {
> >>>>>>>>>>>>               StreamingConnectionFactory cf = new
> >>>>>>>>>>>> StreamingConnectionFactory(new
> >>>>>>>>>> Options.Builder().natsUrl(baseAddressNats)
> >>>>>>>>>>>> .clusterId("cluster-id").clientId("client-id").build());
> >>>>>>>>>>>>               return cf.createConnection();
> >>>>>>>>>>>>           }
> >>>>>>>>>>>>
> >>>>>>>>>>>>           public void destroy(@Disposes final
> StreamingConnection
> >>>>>>>> instance)
> >>>>>>>>>>>>                   throws IOException, TimeoutException,
> >>>>>>>>>> InterruptedException {
> >>>>>>>>>>>>               instance.close();
> >>>>>>>>>>>>           }
> >>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>> But now i am creating a new thread because any injections with
> >> JPA
> >>>>>>>> had
> >>>>>>>>>>>> cacheing issues and this seems to work but i am not sure it is
> >>>>>>>>>>>> broadcasting to websockets correctly
> >>>>>>>>>>>> @Singleton
> >>>>>>>>>>>> @Lock(LockType.READ)
> >>>>>>>>>>>> @Startup
> >>>>>>>>>>>> public class SchedulerEvents {
> >>>>>>>>>>>>           private static final Logger log =
> >>>>>>>>>>>> Logger.getLogger(SchedulerEvents.class.getName());
> >>>>>>>>>>>>
> >>>>>>>>>>>>           @Inject
> >>>>>>>>>>>>           private StreamingConnection streamingConnection;
> >>>>>>>>>>>>
> >>>>>>>>>>>>           @Inject
> >>>>>>>>>>>>           private SomeController someController;
> >>>>>>>>>>>>
> >>>>>>>>>>>>           @PostConstruct
> >>>>>>>>>>>>           private void construct() {
> >>>>>>>>>>>> //        log.fine(Thread.currentThread().getName());
> >>>>>>>>>>>>               try {
> >>>>>>>>>>>>
> >> streamingConnection.subscribe("scheduler:notify",
> >>>> new
> >>>>>>>>>>>> MessageHandler() {
> >>>>>>>>>>>>                       @Override
> >>>>>>>>>>>>                       public void onMessage(Message m) {
> >>>>>>>>>>>>                           try {
> >>>>>>>>>>>>
> >>>>>>>> log.fine(Thread.currentThread().getName());
> >>>>>>>>>>>>                               // this needs to spawn a new
> thread
> >>>>>>>> otherwise
> >>>>>>>>>>>> injections are stale
> >>>>>>>>>>>>                               Thread thread = new Thread(new
> >>>>>>>> Runnable() {
> >>>>>>>>>>>>                                   public void run() {
> >>>>>>>>>>>> log.fine(Thread.currentThread().getName());
> >>>>>>>>>>>>                                       process(m.getData());
> >>>>>>>>>>>>                                   }
> >>>>>>>>>>>>                               });
> >>>>>>>>>>>>                               thread.start();
> >>>>>>>>>>>>                               while (thread.isAlive()) {
> >>>>>>>>>>>>                                   // wait
> >>>>>>>>>>>>                               }
> >>>>>>>>>>>>                               log.fine("Thread finished OK");
> >>>>>>>>>>>>                               m.ack();
> >>>>>>>>>>>>                           } catch (Exception e) {
> >>>>>>>>>>>>
>  emailController.emailStackTrace(e);
> >>>>>>>>>>>>                           }
> >>>>>>>>>>>>                       }
> >>>>>>>>>>>>                   }, new
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>
> SubscriptionOptions.Builder().startWithLastReceived().manualAcks().ackWait(Duration.ofSeconds(60))
> >>>>    .durableName("scheduler-service").build());
> >>>>>>>>>>>>               } catch (IOException | InterruptedException |
> >>>>>>>>>> TimeoutException e)
> >>>>>>>>>>>> {
> >>>>>>>>>>>>                   e.printStackTrace();
> >>>>>>>>>>>>               }
> >>>>>>>>>>>>           }
> >>>>>>>>>>>>
> >>>>>>>>>>>>           private void process(byte[] data) {
> >>>>>>>>>>>>               String raw = new String(data);
> >>>>>>>>>>>>               JsonReader jsonReader = Json.createReader(new
> >>>>>>>>>> StringReader(raw));
> >>>>>>>>>>>>               JsonObject jo = jsonReader.readObject();
> >>>>>>>>>>>>               jsonReader.close();
> >>>>>>>>>>>>               String type = utilityDao.readJsonString(jo,
> >> "type");
> >>>>>>>>>>>>               int id = utilityDao.readJsonInteger(jo, "id");
> >>>>>>>>>>>>               if (type == null || id == 0) {
> >>>>>>>>>>>>                   emailController.emailThrowable(new
> Throwable(),
> >>>> raw);
> >>>>>>>>>>>>                   return;
> >>>>>>>>>>>>               }
> >>>>>>>>>>>>               log.info("Received a message: id: " + id + ",
> >> type:"
> >>>> +
> >>>>>>>> type);
> >>>>>>>>>>>>              DefaultServerEndpointConfigurator dsec = new
> >>>>>>>>>>>> DefaultServerEndpointConfigurator();
> >>>>>>>>>>>>              SomeWebSocket nws =
> >>>>>>>>>> dsec.getEndpointInstance(SomeWebSocket.class);
> >>>>>>>>>>>>              nws.broadcast(ja.toString());
> >>>>>>>>>>>>           }
> >>>>>>>>>>>>
> >>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>> what is the best way to use an autocloseable?
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>
>
>

Reply via email to