i have never used a JCA adapter before.  is it loaded in using the tomee.xml as a Resource?  and then injected into a singleton for subscribing to messages?

On 08/06/2021 17:15, Jonathan Gallimore wrote:
Definitely sounds like a good case for a JCA adapter. I'll take a quick
swing at hooking up an example for you.

Jon

On Tue, Jun 8, 2021 at 9:02 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

Hi Jon,

NATS is basically a message queue, like ActiveMQ I suppose.

I included the adapter into the project using maven
<dependency>
       <groupId>io.nats</groupId>
       <artifactId>java-nats-streaming</artifactId>
       <version>2.2.3</version>
</dependency>

i started up a nats server using docker.  here is my docker-compose.yml
version: '3.1'
services:
    nats-docker:
      image: nats-streaming:0.17.0
      restart: always
      command:
        - '-p'
        - '4222'
        - '-m'
        - '8222'
        - '-hbi'
        - '5s'
        - '-hbt'
        - '5s'
        - '-hbf'
        - '2'
        - '-SD'
        - '-cid'
        - 'yourclientid'
      environment:
        TZ: Europe/London
        LANG: en_GB.UTF-8
        LANGUAGE: en_GB:en
        LC_ALL: en_GB.UTF-8
      ports:
        - '4222:4222'
        - '8222:8222'
      expose:
        - 4222
        - 8222
      networks:
        - backend
networks:
    backend:
      driver: bridge

JCA sounds good if it solves the threading issue.  it is very kind of
you to offer to help write an adapter.  looking at the code you sent it
looks complicated but i can have a stab at it if you don't have much time

let me know if you need more info

Matthew

On 07/06/2021 17:48, Jonathan Gallimore wrote:
At the risk of sounding a bit ignorant... what is NATS?

  From what I can tell, it sounds like you're receiving a stream of events
(over websocket) and want to do some processing in an EJB or CDI bean for
each event. The connection to the NATS server isn't in the context of a
HTTP (or any other type of) request, and just runs all the time while the
server is running - does that sound about right?

Assuming that sounds right, it sounds a bit like the Slack JCA connector
I
wrote a while back:
https://github.com/apache/tomee-chatterbox/tree/master/chatterbox-slack.
Essentially, the resource adapter connects to slack and runs all the
time.
Messages that come into the server from slack are processed in MDBs that
implement the InboundListener interface.

JCA certainly feels complex, especially when compared with your
Singleton @Startup bean approach, but I usually find that if I try and
work
with threads in EJBs, things usually go in the wrong direction.
Conversely,
JCA even gives you a work manager to potentially handle that stuff.

If you can give me some pointers to running a NATS server, I'd be happy
to
help with a sample adapter and application.

Jon

On Mon, Jun 7, 2021 at 11:49 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

I am trying to subscribe to a NATS streaming server with
https://github.com/nats-io/stan.java which is java.lang.Autocloseable.

At first it wasn't closing properly as seen in my original gist:
https://gist.github.com/chongma/2a3ab451f2aeabc98340a9b897394cfe

This was solved with this


https://stackoverflow.com/questions/39080296/hazelcast-threads-prevent-tomee-from-stopping

creating a default producer:
@ApplicationScoped
public class NatsConnectionProducer {

       @Resource(name = "baseAddressNats")
       private String baseAddressNats;

       @Produces
       @ApplicationScoped
       public StreamingConnection instance() throws IOException,
InterruptedException {
           StreamingConnectionFactory cf = new
StreamingConnectionFactory(new
Options.Builder().natsUrl(baseAddressNats)
.clusterId("cluster-id").clientId("client-id").build());
           return cf.createConnection();
       }

       public void destroy(@Disposes final StreamingConnection instance)
               throws IOException, TimeoutException,
InterruptedException {
           instance.close();
       }
}

But now i am creating a new thread because any injections with JPA had
cacheing issues and this seems to work but i am not sure it is
broadcasting to websockets correctly
@Singleton
@Lock(LockType.READ)
@Startup
public class SchedulerEvents {
       private static final Logger log =
Logger.getLogger(SchedulerEvents.class.getName());

       @Inject
       private StreamingConnection streamingConnection;

       @Inject
       private SomeController someController;

       @PostConstruct
       private void construct() {
//        log.fine(Thread.currentThread().getName());
           try {
               streamingConnection.subscribe("scheduler:notify", new
MessageHandler() {
                   @Override
                   public void onMessage(Message m) {
                       try {
                           log.fine(Thread.currentThread().getName());
                           // this needs to spawn a new thread otherwise
injections are stale
                           Thread thread = new Thread(new Runnable() {
                               public void run() {
log.fine(Thread.currentThread().getName());
                                   process(m.getData());
                               }
                           });
                           thread.start();
                           while (thread.isAlive()) {
                               // wait
                           }
                           log.fine("Thread finished OK");
                           m.ack();
                       } catch (Exception e) {
                           emailController.emailStackTrace(e);
                       }
                   }
               }, new


SubscriptionOptions.Builder().startWithLastReceived().manualAcks().ackWait(Duration.ofSeconds(60))
                       .durableName("scheduler-service").build());
           } catch (IOException | InterruptedException |
TimeoutException e)
{
               e.printStackTrace();
           }
       }

       private void process(byte[] data) {
           String raw = new String(data);
           JsonReader jsonReader = Json.createReader(new
StringReader(raw));
           JsonObject jo = jsonReader.readObject();
           jsonReader.close();
           String type = utilityDao.readJsonString(jo, "type");
           int id = utilityDao.readJsonInteger(jo, "id");
           if (type == null || id == 0) {
               emailController.emailThrowable(new Throwable(), raw);
               return;
           }
           log.info("Received a message: id: " + id + ", type:" + type);
          DefaultServerEndpointConfigurator dsec = new
DefaultServerEndpointConfigurator();
          SomeWebSocket nws =
dsec.getEndpointInstance(SomeWebSocket.class);
          nws.broadcast(ja.toString());
       }

}

what is the best way to use an autocloseable?





Reply via email to