At the risk of sounding a bit ignorant... what is NATS? >From what I can tell, it sounds like you're receiving a stream of events (over websocket) and want to do some processing in an EJB or CDI bean for each event. The connection to the NATS server isn't in the context of a HTTP (or any other type of) request, and just runs all the time while the server is running - does that sound about right?
Assuming that sounds right, it sounds a bit like the Slack JCA connector I wrote a while back: https://github.com/apache/tomee-chatterbox/tree/master/chatterbox-slack. Essentially, the resource adapter connects to slack and runs all the time. Messages that come into the server from slack are processed in MDBs that implement the InboundListener interface. JCA certainly feels complex, especially when compared with your Singleton @Startup bean approach, but I usually find that if I try and work with threads in EJBs, things usually go in the wrong direction. Conversely, JCA even gives you a work manager to potentially handle that stuff. If you can give me some pointers to running a NATS server, I'd be happy to help with a sample adapter and application. Jon On Mon, Jun 7, 2021 at 11:49 AM Matthew Broadhead <matthew.broadh...@nbmlaw.co.uk.invalid> wrote: > I am trying to subscribe to a NATS streaming server with > https://github.com/nats-io/stan.java which is java.lang.Autocloseable. > > At first it wasn't closing properly as seen in my original gist: > https://gist.github.com/chongma/2a3ab451f2aeabc98340a9b897394cfe > > This was solved with this > > https://stackoverflow.com/questions/39080296/hazelcast-threads-prevent-tomee-from-stopping > > > creating a default producer: > @ApplicationScoped > public class NatsConnectionProducer { > > @Resource(name = "baseAddressNats") > private String baseAddressNats; > > @Produces > @ApplicationScoped > public StreamingConnection instance() throws IOException, > InterruptedException { > StreamingConnectionFactory cf = new > StreamingConnectionFactory(new Options.Builder().natsUrl(baseAddressNats) > .clusterId("cluster-id").clientId("client-id").build()); > return cf.createConnection(); > } > > public void destroy(@Disposes final StreamingConnection instance) > throws IOException, TimeoutException, InterruptedException { > instance.close(); > } > } > > But now i am creating a new thread because any injections with JPA had > cacheing issues and this seems to work but i am not sure it is > broadcasting to websockets correctly > @Singleton > @Lock(LockType.READ) > @Startup > public class SchedulerEvents { > private static final Logger log = > Logger.getLogger(SchedulerEvents.class.getName()); > > @Inject > private StreamingConnection streamingConnection; > > @Inject > private SomeController someController; > > @PostConstruct > private void construct() { > // log.fine(Thread.currentThread().getName()); > try { > streamingConnection.subscribe("scheduler:notify", new > MessageHandler() { > @Override > public void onMessage(Message m) { > try { > log.fine(Thread.currentThread().getName()); > // this needs to spawn a new thread otherwise > injections are stale > Thread thread = new Thread(new Runnable() { > public void run() { > log.fine(Thread.currentThread().getName()); > process(m.getData()); > } > }); > thread.start(); > while (thread.isAlive()) { > // wait > } > log.fine("Thread finished OK"); > m.ack(); > } catch (Exception e) { > emailController.emailStackTrace(e); > } > } > }, new > > SubscriptionOptions.Builder().startWithLastReceived().manualAcks().ackWait(Duration.ofSeconds(60)) > .durableName("scheduler-service").build()); > } catch (IOException | InterruptedException | TimeoutException e) > { > e.printStackTrace(); > } > } > > private void process(byte[] data) { > String raw = new String(data); > JsonReader jsonReader = Json.createReader(new StringReader(raw)); > JsonObject jo = jsonReader.readObject(); > jsonReader.close(); > String type = utilityDao.readJsonString(jo, "type"); > int id = utilityDao.readJsonInteger(jo, "id"); > if (type == null || id == 0) { > emailController.emailThrowable(new Throwable(), raw); > return; > } > log.info("Received a message: id: " + id + ", type:" + type); > DefaultServerEndpointConfigurator dsec = new > DefaultServerEndpointConfigurator(); > SomeWebSocket nws = dsec.getEndpointInstance(SomeWebSocket.class); > nws.broadcast(ja.toString()); > } > > } > > what is the best way to use an autocloseable? > >