I think I have something wired up, but when executing this:

            cf = new
                    StreamingConnectionFactory(new
Options.Builder().natsUrl(baseAddress)
                    .clusterId("cluster-id").clientId("client-id").build());

            connection = cf.createConnection();


connection is null. Any pointers?

Jon

On Wed, Jun 9, 2021 at 8:16 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

> i have never used a JCA adapter before.  is it loaded in using the
> tomee.xml as a Resource?  and then injected into a singleton for
> subscribing to messages?
>
> On 08/06/2021 17:15, Jonathan Gallimore wrote:
> > Definitely sounds like a good case for a JCA adapter. I'll take a quick
> > swing at hooking up an example for you.
> >
> > Jon
> >
> > On Tue, Jun 8, 2021 at 9:02 AM Matthew Broadhead
> > <matthew.broadh...@nbmlaw.co.uk.invalid> wrote:
> >
> >> Hi Jon,
> >>
> >> NATS is basically a message queue, like ActiveMQ I suppose.
> >>
> >> I included the adapter into the project using maven
> >> <dependency>
> >>        <groupId>io.nats</groupId>
> >>        <artifactId>java-nats-streaming</artifactId>
> >>        <version>2.2.3</version>
> >> </dependency>
> >>
> >> i started up a nats server using docker.  here is my docker-compose.yml
> >> version: '3.1'
> >> services:
> >>     nats-docker:
> >>       image: nats-streaming:0.17.0
> >>       restart: always
> >>       command:
> >>         - '-p'
> >>         - '4222'
> >>         - '-m'
> >>         - '8222'
> >>         - '-hbi'
> >>         - '5s'
> >>         - '-hbt'
> >>         - '5s'
> >>         - '-hbf'
> >>         - '2'
> >>         - '-SD'
> >>         - '-cid'
> >>         - 'yourclientid'
> >>       environment:
> >>         TZ: Europe/London
> >>         LANG: en_GB.UTF-8
> >>         LANGUAGE: en_GB:en
> >>         LC_ALL: en_GB.UTF-8
> >>       ports:
> >>         - '4222:4222'
> >>         - '8222:8222'
> >>       expose:
> >>         - 4222
> >>         - 8222
> >>       networks:
> >>         - backend
> >> networks:
> >>     backend:
> >>       driver: bridge
> >>
> >> JCA sounds good if it solves the threading issue.  it is very kind of
> >> you to offer to help write an adapter.  looking at the code you sent it
> >> looks complicated but i can have a stab at it if you don't have much
> time
> >>
> >> let me know if you need more info
> >>
> >> Matthew
> >>
> >> On 07/06/2021 17:48, Jonathan Gallimore wrote:
> >>> At the risk of sounding a bit ignorant... what is NATS?
> >>>
> >>>   From what I can tell, it sounds like you're receiving a stream of
> events
> >>> (over websocket) and want to do some processing in an EJB or CDI bean
> for
> >>> each event. The connection to the NATS server isn't in the context of a
> >>> HTTP (or any other type of) request, and just runs all the time while
> the
> >>> server is running - does that sound about right?
> >>>
> >>> Assuming that sounds right, it sounds a bit like the Slack JCA
> connector
> >> I
> >>> wrote a while back:
> >>>
> https://github.com/apache/tomee-chatterbox/tree/master/chatterbox-slack.
> >>> Essentially, the resource adapter connects to slack and runs all the
> >> time.
> >>> Messages that come into the server from slack are processed in MDBs
> that
> >>> implement the InboundListener interface.
> >>>
> >>> JCA certainly feels complex, especially when compared with your
> >>> Singleton @Startup bean approach, but I usually find that if I try and
> >> work
> >>> with threads in EJBs, things usually go in the wrong direction.
> >> Conversely,
> >>> JCA even gives you a work manager to potentially handle that stuff.
> >>>
> >>> If you can give me some pointers to running a NATS server, I'd be happy
> >> to
> >>> help with a sample adapter and application.
> >>>
> >>> Jon
> >>>
> >>> On Mon, Jun 7, 2021 at 11:49 AM Matthew Broadhead
> >>> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote:
> >>>
> >>>> I am trying to subscribe to a NATS streaming server with
> >>>> https://github.com/nats-io/stan.java which is
> java.lang.Autocloseable.
> >>>>
> >>>> At first it wasn't closing properly as seen in my original gist:
> >>>> https://gist.github.com/chongma/2a3ab451f2aeabc98340a9b897394cfe
> >>>>
> >>>> This was solved with this
> >>>>
> >>>>
> >>
> https://stackoverflow.com/questions/39080296/hazelcast-threads-prevent-tomee-from-stopping
> >>>>
> >>>> creating a default producer:
> >>>> @ApplicationScoped
> >>>> public class NatsConnectionProducer {
> >>>>
> >>>>        @Resource(name = "baseAddressNats")
> >>>>        private String baseAddressNats;
> >>>>
> >>>>        @Produces
> >>>>        @ApplicationScoped
> >>>>        public StreamingConnection instance() throws IOException,
> >>>> InterruptedException {
> >>>>            StreamingConnectionFactory cf = new
> >>>> StreamingConnectionFactory(new
> >> Options.Builder().natsUrl(baseAddressNats)
> >>>> .clusterId("cluster-id").clientId("client-id").build());
> >>>>            return cf.createConnection();
> >>>>        }
> >>>>
> >>>>        public void destroy(@Disposes final StreamingConnection
> instance)
> >>>>                throws IOException, TimeoutException,
> >> InterruptedException {
> >>>>            instance.close();
> >>>>        }
> >>>> }
> >>>>
> >>>> But now i am creating a new thread because any injections with JPA had
> >>>> cacheing issues and this seems to work but i am not sure it is
> >>>> broadcasting to websockets correctly
> >>>> @Singleton
> >>>> @Lock(LockType.READ)
> >>>> @Startup
> >>>> public class SchedulerEvents {
> >>>>        private static final Logger log =
> >>>> Logger.getLogger(SchedulerEvents.class.getName());
> >>>>
> >>>>        @Inject
> >>>>        private StreamingConnection streamingConnection;
> >>>>
> >>>>        @Inject
> >>>>        private SomeController someController;
> >>>>
> >>>>        @PostConstruct
> >>>>        private void construct() {
> >>>> //        log.fine(Thread.currentThread().getName());
> >>>>            try {
> >>>>                streamingConnection.subscribe("scheduler:notify", new
> >>>> MessageHandler() {
> >>>>                    @Override
> >>>>                    public void onMessage(Message m) {
> >>>>                        try {
> >>>>                            log.fine(Thread.currentThread().getName());
> >>>>                            // this needs to spawn a new thread
> otherwise
> >>>> injections are stale
> >>>>                            Thread thread = new Thread(new Runnable() {
> >>>>                                public void run() {
> >>>> log.fine(Thread.currentThread().getName());
> >>>>                                    process(m.getData());
> >>>>                                }
> >>>>                            });
> >>>>                            thread.start();
> >>>>                            while (thread.isAlive()) {
> >>>>                                // wait
> >>>>                            }
> >>>>                            log.fine("Thread finished OK");
> >>>>                            m.ack();
> >>>>                        } catch (Exception e) {
> >>>>                            emailController.emailStackTrace(e);
> >>>>                        }
> >>>>                    }
> >>>>                }, new
> >>>>
> >>>>
> >>
> SubscriptionOptions.Builder().startWithLastReceived().manualAcks().ackWait(Duration.ofSeconds(60))
> >>>>                        .durableName("scheduler-service").build());
> >>>>            } catch (IOException | InterruptedException |
> >> TimeoutException e)
> >>>> {
> >>>>                e.printStackTrace();
> >>>>            }
> >>>>        }
> >>>>
> >>>>        private void process(byte[] data) {
> >>>>            String raw = new String(data);
> >>>>            JsonReader jsonReader = Json.createReader(new
> >> StringReader(raw));
> >>>>            JsonObject jo = jsonReader.readObject();
> >>>>            jsonReader.close();
> >>>>            String type = utilityDao.readJsonString(jo, "type");
> >>>>            int id = utilityDao.readJsonInteger(jo, "id");
> >>>>            if (type == null || id == 0) {
> >>>>                emailController.emailThrowable(new Throwable(), raw);
> >>>>                return;
> >>>>            }
> >>>>            log.info("Received a message: id: " + id + ", type:" +
> type);
> >>>>           DefaultServerEndpointConfigurator dsec = new
> >>>> DefaultServerEndpointConfigurator();
> >>>>           SomeWebSocket nws =
> >> dsec.getEndpointInstance(SomeWebSocket.class);
> >>>>           nws.broadcast(ja.toString());
> >>>>        }
> >>>>
> >>>> }
> >>>>
> >>>> what is the best way to use an autocloseable?
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to