Nervermind, I figured out my mistake. I'll post back when I have something
going.

Jon

On Wed, Jun 9, 2021 at 11:44 AM Jonathan Gallimore <
jonathan.gallim...@gmail.com> wrote:

> I think I have something wired up, but when executing this:
>
>             cf = new
>                     StreamingConnectionFactory(new
> Options.Builder().natsUrl(baseAddress)
>
> .clusterId("cluster-id").clientId("client-id").build());
>
>             connection = cf.createConnection();
>
>
> connection is null. Any pointers?
>
> Jon
>
> On Wed, Jun 9, 2021 at 8:16 AM Matthew Broadhead
> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote:
>
>> i have never used a JCA adapter before.  is it loaded in using the
>> tomee.xml as a Resource?  and then injected into a singleton for
>> subscribing to messages?
>>
>> On 08/06/2021 17:15, Jonathan Gallimore wrote:
>> > Definitely sounds like a good case for a JCA adapter. I'll take a quick
>> > swing at hooking up an example for you.
>> >
>> > Jon
>> >
>> > On Tue, Jun 8, 2021 at 9:02 AM Matthew Broadhead
>> > <matthew.broadh...@nbmlaw.co.uk.invalid> wrote:
>> >
>> >> Hi Jon,
>> >>
>> >> NATS is basically a message queue, like ActiveMQ I suppose.
>> >>
>> >> I included the adapter into the project using maven
>> >> <dependency>
>> >>        <groupId>io.nats</groupId>
>> >>        <artifactId>java-nats-streaming</artifactId>
>> >>        <version>2.2.3</version>
>> >> </dependency>
>> >>
>> >> i started up a nats server using docker.  here is my docker-compose.yml
>> >> version: '3.1'
>> >> services:
>> >>     nats-docker:
>> >>       image: nats-streaming:0.17.0
>> >>       restart: always
>> >>       command:
>> >>         - '-p'
>> >>         - '4222'
>> >>         - '-m'
>> >>         - '8222'
>> >>         - '-hbi'
>> >>         - '5s'
>> >>         - '-hbt'
>> >>         - '5s'
>> >>         - '-hbf'
>> >>         - '2'
>> >>         - '-SD'
>> >>         - '-cid'
>> >>         - 'yourclientid'
>> >>       environment:
>> >>         TZ: Europe/London
>> >>         LANG: en_GB.UTF-8
>> >>         LANGUAGE: en_GB:en
>> >>         LC_ALL: en_GB.UTF-8
>> >>       ports:
>> >>         - '4222:4222'
>> >>         - '8222:8222'
>> >>       expose:
>> >>         - 4222
>> >>         - 8222
>> >>       networks:
>> >>         - backend
>> >> networks:
>> >>     backend:
>> >>       driver: bridge
>> >>
>> >> JCA sounds good if it solves the threading issue.  it is very kind of
>> >> you to offer to help write an adapter.  looking at the code you sent it
>> >> looks complicated but i can have a stab at it if you don't have much
>> time
>> >>
>> >> let me know if you need more info
>> >>
>> >> Matthew
>> >>
>> >> On 07/06/2021 17:48, Jonathan Gallimore wrote:
>> >>> At the risk of sounding a bit ignorant... what is NATS?
>> >>>
>> >>>   From what I can tell, it sounds like you're receiving a stream of
>> events
>> >>> (over websocket) and want to do some processing in an EJB or CDI bean
>> for
>> >>> each event. The connection to the NATS server isn't in the context of
>> a
>> >>> HTTP (or any other type of) request, and just runs all the time while
>> the
>> >>> server is running - does that sound about right?
>> >>>
>> >>> Assuming that sounds right, it sounds a bit like the Slack JCA
>> connector
>> >> I
>> >>> wrote a while back:
>> >>>
>> https://github.com/apache/tomee-chatterbox/tree/master/chatterbox-slack.
>> >>> Essentially, the resource adapter connects to slack and runs all the
>> >> time.
>> >>> Messages that come into the server from slack are processed in MDBs
>> that
>> >>> implement the InboundListener interface.
>> >>>
>> >>> JCA certainly feels complex, especially when compared with your
>> >>> Singleton @Startup bean approach, but I usually find that if I try and
>> >> work
>> >>> with threads in EJBs, things usually go in the wrong direction.
>> >> Conversely,
>> >>> JCA even gives you a work manager to potentially handle that stuff.
>> >>>
>> >>> If you can give me some pointers to running a NATS server, I'd be
>> happy
>> >> to
>> >>> help with a sample adapter and application.
>> >>>
>> >>> Jon
>> >>>
>> >>> On Mon, Jun 7, 2021 at 11:49 AM Matthew Broadhead
>> >>> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote:
>> >>>
>> >>>> I am trying to subscribe to a NATS streaming server with
>> >>>> https://github.com/nats-io/stan.java which is
>> java.lang.Autocloseable.
>> >>>>
>> >>>> At first it wasn't closing properly as seen in my original gist:
>> >>>> https://gist.github.com/chongma/2a3ab451f2aeabc98340a9b897394cfe
>> >>>>
>> >>>> This was solved with this
>> >>>>
>> >>>>
>> >>
>> https://stackoverflow.com/questions/39080296/hazelcast-threads-prevent-tomee-from-stopping
>> >>>>
>> >>>> creating a default producer:
>> >>>> @ApplicationScoped
>> >>>> public class NatsConnectionProducer {
>> >>>>
>> >>>>        @Resource(name = "baseAddressNats")
>> >>>>        private String baseAddressNats;
>> >>>>
>> >>>>        @Produces
>> >>>>        @ApplicationScoped
>> >>>>        public StreamingConnection instance() throws IOException,
>> >>>> InterruptedException {
>> >>>>            StreamingConnectionFactory cf = new
>> >>>> StreamingConnectionFactory(new
>> >> Options.Builder().natsUrl(baseAddressNats)
>> >>>> .clusterId("cluster-id").clientId("client-id").build());
>> >>>>            return cf.createConnection();
>> >>>>        }
>> >>>>
>> >>>>        public void destroy(@Disposes final StreamingConnection
>> instance)
>> >>>>                throws IOException, TimeoutException,
>> >> InterruptedException {
>> >>>>            instance.close();
>> >>>>        }
>> >>>> }
>> >>>>
>> >>>> But now i am creating a new thread because any injections with JPA
>> had
>> >>>> cacheing issues and this seems to work but i am not sure it is
>> >>>> broadcasting to websockets correctly
>> >>>> @Singleton
>> >>>> @Lock(LockType.READ)
>> >>>> @Startup
>> >>>> public class SchedulerEvents {
>> >>>>        private static final Logger log =
>> >>>> Logger.getLogger(SchedulerEvents.class.getName());
>> >>>>
>> >>>>        @Inject
>> >>>>        private StreamingConnection streamingConnection;
>> >>>>
>> >>>>        @Inject
>> >>>>        private SomeController someController;
>> >>>>
>> >>>>        @PostConstruct
>> >>>>        private void construct() {
>> >>>> //        log.fine(Thread.currentThread().getName());
>> >>>>            try {
>> >>>>                streamingConnection.subscribe("scheduler:notify", new
>> >>>> MessageHandler() {
>> >>>>                    @Override
>> >>>>                    public void onMessage(Message m) {
>> >>>>                        try {
>> >>>>
>> log.fine(Thread.currentThread().getName());
>> >>>>                            // this needs to spawn a new thread
>> otherwise
>> >>>> injections are stale
>> >>>>                            Thread thread = new Thread(new Runnable()
>> {
>> >>>>                                public void run() {
>> >>>> log.fine(Thread.currentThread().getName());
>> >>>>                                    process(m.getData());
>> >>>>                                }
>> >>>>                            });
>> >>>>                            thread.start();
>> >>>>                            while (thread.isAlive()) {
>> >>>>                                // wait
>> >>>>                            }
>> >>>>                            log.fine("Thread finished OK");
>> >>>>                            m.ack();
>> >>>>                        } catch (Exception e) {
>> >>>>                            emailController.emailStackTrace(e);
>> >>>>                        }
>> >>>>                    }
>> >>>>                }, new
>> >>>>
>> >>>>
>> >>
>> SubscriptionOptions.Builder().startWithLastReceived().manualAcks().ackWait(Duration.ofSeconds(60))
>> >>>>                        .durableName("scheduler-service").build());
>> >>>>            } catch (IOException | InterruptedException |
>> >> TimeoutException e)
>> >>>> {
>> >>>>                e.printStackTrace();
>> >>>>            }
>> >>>>        }
>> >>>>
>> >>>>        private void process(byte[] data) {
>> >>>>            String raw = new String(data);
>> >>>>            JsonReader jsonReader = Json.createReader(new
>> >> StringReader(raw));
>> >>>>            JsonObject jo = jsonReader.readObject();
>> >>>>            jsonReader.close();
>> >>>>            String type = utilityDao.readJsonString(jo, "type");
>> >>>>            int id = utilityDao.readJsonInteger(jo, "id");
>> >>>>            if (type == null || id == 0) {
>> >>>>                emailController.emailThrowable(new Throwable(), raw);
>> >>>>                return;
>> >>>>            }
>> >>>>            log.info("Received a message: id: " + id + ", type:" +
>> type);
>> >>>>           DefaultServerEndpointConfigurator dsec = new
>> >>>> DefaultServerEndpointConfigurator();
>> >>>>           SomeWebSocket nws =
>> >> dsec.getEndpointInstance(SomeWebSocket.class);
>> >>>>           nws.broadcast(ja.toString());
>> >>>>        }
>> >>>>
>> >>>> }
>> >>>>
>> >>>> what is the best way to use an autocloseable?
>> >>>>
>> >>>>
>> >>
>> >>
>>
>>

Reply via email to