Nervermind, I figured out my mistake. I'll post back when I have something going.
Jon On Wed, Jun 9, 2021 at 11:44 AM Jonathan Gallimore < jonathan.gallim...@gmail.com> wrote: > I think I have something wired up, but when executing this: > > cf = new > StreamingConnectionFactory(new > Options.Builder().natsUrl(baseAddress) > > .clusterId("cluster-id").clientId("client-id").build()); > > connection = cf.createConnection(); > > > connection is null. Any pointers? > > Jon > > On Wed, Jun 9, 2021 at 8:16 AM Matthew Broadhead > <matthew.broadh...@nbmlaw.co.uk.invalid> wrote: > >> i have never used a JCA adapter before. is it loaded in using the >> tomee.xml as a Resource? and then injected into a singleton for >> subscribing to messages? >> >> On 08/06/2021 17:15, Jonathan Gallimore wrote: >> > Definitely sounds like a good case for a JCA adapter. I'll take a quick >> > swing at hooking up an example for you. >> > >> > Jon >> > >> > On Tue, Jun 8, 2021 at 9:02 AM Matthew Broadhead >> > <matthew.broadh...@nbmlaw.co.uk.invalid> wrote: >> > >> >> Hi Jon, >> >> >> >> NATS is basically a message queue, like ActiveMQ I suppose. >> >> >> >> I included the adapter into the project using maven >> >> <dependency> >> >> <groupId>io.nats</groupId> >> >> <artifactId>java-nats-streaming</artifactId> >> >> <version>2.2.3</version> >> >> </dependency> >> >> >> >> i started up a nats server using docker. here is my docker-compose.yml >> >> version: '3.1' >> >> services: >> >> nats-docker: >> >> image: nats-streaming:0.17.0 >> >> restart: always >> >> command: >> >> - '-p' >> >> - '4222' >> >> - '-m' >> >> - '8222' >> >> - '-hbi' >> >> - '5s' >> >> - '-hbt' >> >> - '5s' >> >> - '-hbf' >> >> - '2' >> >> - '-SD' >> >> - '-cid' >> >> - 'yourclientid' >> >> environment: >> >> TZ: Europe/London >> >> LANG: en_GB.UTF-8 >> >> LANGUAGE: en_GB:en >> >> LC_ALL: en_GB.UTF-8 >> >> ports: >> >> - '4222:4222' >> >> - '8222:8222' >> >> expose: >> >> - 4222 >> >> - 8222 >> >> networks: >> >> - backend >> >> networks: >> >> backend: >> >> driver: bridge >> >> >> >> JCA sounds good if it solves the threading issue. it is very kind of >> >> you to offer to help write an adapter. looking at the code you sent it >> >> looks complicated but i can have a stab at it if you don't have much >> time >> >> >> >> let me know if you need more info >> >> >> >> Matthew >> >> >> >> On 07/06/2021 17:48, Jonathan Gallimore wrote: >> >>> At the risk of sounding a bit ignorant... what is NATS? >> >>> >> >>> From what I can tell, it sounds like you're receiving a stream of >> events >> >>> (over websocket) and want to do some processing in an EJB or CDI bean >> for >> >>> each event. The connection to the NATS server isn't in the context of >> a >> >>> HTTP (or any other type of) request, and just runs all the time while >> the >> >>> server is running - does that sound about right? >> >>> >> >>> Assuming that sounds right, it sounds a bit like the Slack JCA >> connector >> >> I >> >>> wrote a while back: >> >>> >> https://github.com/apache/tomee-chatterbox/tree/master/chatterbox-slack. >> >>> Essentially, the resource adapter connects to slack and runs all the >> >> time. >> >>> Messages that come into the server from slack are processed in MDBs >> that >> >>> implement the InboundListener interface. >> >>> >> >>> JCA certainly feels complex, especially when compared with your >> >>> Singleton @Startup bean approach, but I usually find that if I try and >> >> work >> >>> with threads in EJBs, things usually go in the wrong direction. >> >> Conversely, >> >>> JCA even gives you a work manager to potentially handle that stuff. >> >>> >> >>> If you can give me some pointers to running a NATS server, I'd be >> happy >> >> to >> >>> help with a sample adapter and application. >> >>> >> >>> Jon >> >>> >> >>> On Mon, Jun 7, 2021 at 11:49 AM Matthew Broadhead >> >>> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote: >> >>> >> >>>> I am trying to subscribe to a NATS streaming server with >> >>>> https://github.com/nats-io/stan.java which is >> java.lang.Autocloseable. >> >>>> >> >>>> At first it wasn't closing properly as seen in my original gist: >> >>>> https://gist.github.com/chongma/2a3ab451f2aeabc98340a9b897394cfe >> >>>> >> >>>> This was solved with this >> >>>> >> >>>> >> >> >> https://stackoverflow.com/questions/39080296/hazelcast-threads-prevent-tomee-from-stopping >> >>>> >> >>>> creating a default producer: >> >>>> @ApplicationScoped >> >>>> public class NatsConnectionProducer { >> >>>> >> >>>> @Resource(name = "baseAddressNats") >> >>>> private String baseAddressNats; >> >>>> >> >>>> @Produces >> >>>> @ApplicationScoped >> >>>> public StreamingConnection instance() throws IOException, >> >>>> InterruptedException { >> >>>> StreamingConnectionFactory cf = new >> >>>> StreamingConnectionFactory(new >> >> Options.Builder().natsUrl(baseAddressNats) >> >>>> .clusterId("cluster-id").clientId("client-id").build()); >> >>>> return cf.createConnection(); >> >>>> } >> >>>> >> >>>> public void destroy(@Disposes final StreamingConnection >> instance) >> >>>> throws IOException, TimeoutException, >> >> InterruptedException { >> >>>> instance.close(); >> >>>> } >> >>>> } >> >>>> >> >>>> But now i am creating a new thread because any injections with JPA >> had >> >>>> cacheing issues and this seems to work but i am not sure it is >> >>>> broadcasting to websockets correctly >> >>>> @Singleton >> >>>> @Lock(LockType.READ) >> >>>> @Startup >> >>>> public class SchedulerEvents { >> >>>> private static final Logger log = >> >>>> Logger.getLogger(SchedulerEvents.class.getName()); >> >>>> >> >>>> @Inject >> >>>> private StreamingConnection streamingConnection; >> >>>> >> >>>> @Inject >> >>>> private SomeController someController; >> >>>> >> >>>> @PostConstruct >> >>>> private void construct() { >> >>>> // log.fine(Thread.currentThread().getName()); >> >>>> try { >> >>>> streamingConnection.subscribe("scheduler:notify", new >> >>>> MessageHandler() { >> >>>> @Override >> >>>> public void onMessage(Message m) { >> >>>> try { >> >>>> >> log.fine(Thread.currentThread().getName()); >> >>>> // this needs to spawn a new thread >> otherwise >> >>>> injections are stale >> >>>> Thread thread = new Thread(new Runnable() >> { >> >>>> public void run() { >> >>>> log.fine(Thread.currentThread().getName()); >> >>>> process(m.getData()); >> >>>> } >> >>>> }); >> >>>> thread.start(); >> >>>> while (thread.isAlive()) { >> >>>> // wait >> >>>> } >> >>>> log.fine("Thread finished OK"); >> >>>> m.ack(); >> >>>> } catch (Exception e) { >> >>>> emailController.emailStackTrace(e); >> >>>> } >> >>>> } >> >>>> }, new >> >>>> >> >>>> >> >> >> SubscriptionOptions.Builder().startWithLastReceived().manualAcks().ackWait(Duration.ofSeconds(60)) >> >>>> .durableName("scheduler-service").build()); >> >>>> } catch (IOException | InterruptedException | >> >> TimeoutException e) >> >>>> { >> >>>> e.printStackTrace(); >> >>>> } >> >>>> } >> >>>> >> >>>> private void process(byte[] data) { >> >>>> String raw = new String(data); >> >>>> JsonReader jsonReader = Json.createReader(new >> >> StringReader(raw)); >> >>>> JsonObject jo = jsonReader.readObject(); >> >>>> jsonReader.close(); >> >>>> String type = utilityDao.readJsonString(jo, "type"); >> >>>> int id = utilityDao.readJsonInteger(jo, "id"); >> >>>> if (type == null || id == 0) { >> >>>> emailController.emailThrowable(new Throwable(), raw); >> >>>> return; >> >>>> } >> >>>> log.info("Received a message: id: " + id + ", type:" + >> type); >> >>>> DefaultServerEndpointConfigurator dsec = new >> >>>> DefaultServerEndpointConfigurator(); >> >>>> SomeWebSocket nws = >> >> dsec.getEndpointInstance(SomeWebSocket.class); >> >>>> nws.broadcast(ja.toString()); >> >>>> } >> >>>> >> >>>> } >> >>>> >> >>>> what is the best way to use an autocloseable? >> >>>> >> >>>> >> >> >> >> >> >>