hi, it was my fault putting a confusing value in the docker compose file.  it should work like this

StreamingConnectionFactory cf = new
 StreamingConnectionFactory(new Options.Builder().natsUrl("nats://localhost:4222")
 .clusterId("yourclientid").clientId("anything").build());

but you could change the docker-compose.yml to have a -cid of yourclusterid and then do this

StreamingConnectionFactory cf = new
 StreamingConnectionFactory(new Options.Builder().natsUrl("nats://localhost:4222")
 .clusterId("yourclusterid").clientId("yourclientid").build());

On 09/06/2021 17:15, Jonathan Gallimore wrote:
Thanks. If I can get that test going, I can probably get the rest working.
I suspect there are some other bugs in there.

Jon

On Wed, Jun 9, 2021 at 4:04 PM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

Thanks I will have a look now

On 09/06/2021 15:19, Jonathan Gallimore wrote:
Ok, I'm stuck. If I boot up a NATS server with your docker-compose.yml
file, and run the following test:

      @Test
      public void testShouldConnect() throws Exception {
          StreamingConnectionFactory cf = new
                  StreamingConnectionFactory(new
Options.Builder().natsUrl("nats://localhost:4222")

.clusterId("cluster-id").clientId("yourclientid").build());
          final StreamingConnection connection = cf.createConnection();
          Assert.assertNotNull(connection);

          connection.close();
      }

It fails with a timeout.

I monitored the connection with wireshark, and see the following

< = from server to client
= from client to server
<INFO

<{"server_id":"NDMRYDSGUSH2QR6SZWMFB44ND5CODXGKNYTQ5IPLUGYUDBI6G54CIGF6","server_name":"NDMRYDSGUSH2QR6SZWMFB44ND5CODXGKNYTQ5IPLUGYUDBI6G54CIGF6","version":"2.1.4","proto":1,"git_commit":"fb009af","go":"go1.13.7","host":"0.0.0.0","port":4222,"max_payload":1048576,"client_id":10}
CONNECT
{"lang":"java","version":"2.6.5","name":"yourclientid","protocol":1,"verbose":false,"pedantic":false,"tls_required":false,"echo":true}
PING
<PONG
SUB _STAN.acks.LP4bdY88abuVJ19Qo5HVuk 1
SUB _INBOX.LP4bdY88abuVJ19Qo5HVn8 2
SUB _INBOX.LP4bdY88abuVJ19Qo5HVqw 3
SUB _INBOX.F0vVy1N0sQM3xseeEWMIAL.* 4
PUB _STAN.discover.cluster-id
_INBOX.F0vVy1N0sQM3xseeEWMIAL.F0vVy1N0sQM3xseeEWMISH 75
.yourclientid.._INBOX.LP4bdY88abuVJ19Qo5HVn8..".LP4bdY88abuVJ19Qo5HVjK(.0.
<PING
PONG
UNSUB 1
UNSUB 2
UNSUB 3
So there does appear to be some communication between my test and the
NATS
server - I have no idea why it times out.

My code is here if you want to have a go:
https://github.com/jgallimore/tomee-chatterbox/tree/nats

Jon

On Wed, Jun 9, 2021 at 11:56 AM Jonathan Gallimore <
jonathan.gallim...@gmail.com> wrote:

Nervermind, I figured out my mistake. I'll post back when I have
something
going.

Jon

On Wed, Jun 9, 2021 at 11:44 AM Jonathan Gallimore <
jonathan.gallim...@gmail.com> wrote:

I think I have something wired up, but when executing this:

              cf = new
                      StreamingConnectionFactory(new
Options.Builder().natsUrl(baseAddress)

.clusterId("cluster-id").clientId("client-id").build());

              connection = cf.createConnection();


connection is null. Any pointers?

Jon

On Wed, Jun 9, 2021 at 8:16 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

i have never used a JCA adapter before.  is it loaded in using the
tomee.xml as a Resource?  and then injected into a singleton for
subscribing to messages?

On 08/06/2021 17:15, Jonathan Gallimore wrote:
Definitely sounds like a good case for a JCA adapter. I'll take a
quick
swing at hooking up an example for you.

Jon

On Tue, Jun 8, 2021 at 9:02 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

Hi Jon,

NATS is basically a message queue, like ActiveMQ I suppose.

I included the adapter into the project using maven
<dependency>
         <groupId>io.nats</groupId>
         <artifactId>java-nats-streaming</artifactId>
         <version>2.2.3</version>
</dependency>

i started up a nats server using docker.  here is my
docker-compose.yml
version: '3.1'
services:
      nats-docker:
        image: nats-streaming:0.17.0
        restart: always
        command:
          - '-p'
          - '4222'
          - '-m'
          - '8222'
          - '-hbi'
          - '5s'
          - '-hbt'
          - '5s'
          - '-hbf'
          - '2'
          - '-SD'
          - '-cid'
          - 'yourclientid'
        environment:
          TZ: Europe/London
          LANG: en_GB.UTF-8
          LANGUAGE: en_GB:en
          LC_ALL: en_GB.UTF-8
        ports:
          - '4222:4222'
          - '8222:8222'
        expose:
          - 4222
          - 8222
        networks:
          - backend
networks:
      backend:
        driver: bridge

JCA sounds good if it solves the threading issue.  it is very kind
of
you to offer to help write an adapter.  looking at the code you sent
it
looks complicated but i can have a stab at it if you don't have much
time
let me know if you need more info

Matthew

On 07/06/2021 17:48, Jonathan Gallimore wrote:
At the risk of sounding a bit ignorant... what is NATS?

    From what I can tell, it sounds like you're receiving a stream
of
events
(over websocket) and want to do some processing in an EJB or CDI
bean for
each event. The connection to the NATS server isn't in the context
of a
HTTP (or any other type of) request, and just runs all the time
while the
server is running - does that sound about right?

Assuming that sounds right, it sounds a bit like the Slack JCA
connector
I
wrote a while back:

https://github.com/apache/tomee-chatterbox/tree/master/chatterbox-slack.
Essentially, the resource adapter connects to slack and runs all
the
time.
Messages that come into the server from slack are processed in MDBs
that
implement the InboundListener interface.

JCA certainly feels complex, especially when compared with your
Singleton @Startup bean approach, but I usually find that if I try
and
work
with threads in EJBs, things usually go in the wrong direction.
Conversely,
JCA even gives you a work manager to potentially handle that stuff.

If you can give me some pointers to running a NATS server, I'd be
happy
to
help with a sample adapter and application.

Jon

On Mon, Jun 7, 2021 at 11:49 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

I am trying to subscribe to a NATS streaming server with
https://github.com/nats-io/stan.java which is
java.lang.Autocloseable.
At first it wasn't closing properly as seen in my original gist:
https://gist.github.com/chongma/2a3ab451f2aeabc98340a9b897394cfe

This was solved with this


https://stackoverflow.com/questions/39080296/hazelcast-threads-prevent-tomee-from-stopping
creating a default producer:
@ApplicationScoped
public class NatsConnectionProducer {

         @Resource(name = "baseAddressNats")
         private String baseAddressNats;

         @Produces
         @ApplicationScoped
         public StreamingConnection instance() throws IOException,
InterruptedException {
             StreamingConnectionFactory cf = new
StreamingConnectionFactory(new
Options.Builder().natsUrl(baseAddressNats)
.clusterId("cluster-id").clientId("client-id").build());
             return cf.createConnection();
         }

         public void destroy(@Disposes final StreamingConnection
instance)
                 throws IOException, TimeoutException,
InterruptedException {
             instance.close();
         }
}

But now i am creating a new thread because any injections with JPA
had
cacheing issues and this seems to work but i am not sure it is
broadcasting to websockets correctly
@Singleton
@Lock(LockType.READ)
@Startup
public class SchedulerEvents {
         private static final Logger log =
Logger.getLogger(SchedulerEvents.class.getName());

         @Inject
         private StreamingConnection streamingConnection;

         @Inject
         private SomeController someController;

         @PostConstruct
         private void construct() {
//        log.fine(Thread.currentThread().getName());
             try {
                 streamingConnection.subscribe("scheduler:notify",
new
MessageHandler() {
                     @Override
                     public void onMessage(Message m) {
                         try {

log.fine(Thread.currentThread().getName());
                             // this needs to spawn a new thread
otherwise
injections are stale
                             Thread thread = new Thread(new
Runnable() {
                                 public void run() {
log.fine(Thread.currentThread().getName());
                                     process(m.getData());
                                 }
                             });
                             thread.start();
                             while (thread.isAlive()) {
                                 // wait
                             }
                             log.fine("Thread finished OK");
                             m.ack();
                         } catch (Exception e) {
                             emailController.emailStackTrace(e);
                         }
                     }
                 }, new


SubscriptionOptions.Builder().startWithLastReceived().manualAcks().ackWait(Duration.ofSeconds(60))
  .durableName("scheduler-service").build());
             } catch (IOException | InterruptedException |
TimeoutException e)
{
                 e.printStackTrace();
             }
         }

         private void process(byte[] data) {
             String raw = new String(data);
             JsonReader jsonReader = Json.createReader(new
StringReader(raw));
             JsonObject jo = jsonReader.readObject();
             jsonReader.close();
             String type = utilityDao.readJsonString(jo, "type");
             int id = utilityDao.readJsonInteger(jo, "id");
             if (type == null || id == 0) {
                 emailController.emailThrowable(new Throwable(),
raw);
                 return;
             }
             log.info("Received a message: id: " + id + ", type:"
+
type);
            DefaultServerEndpointConfigurator dsec = new
DefaultServerEndpointConfigurator();
            SomeWebSocket nws =
dsec.getEndpointInstance(SomeWebSocket.class);
            nws.broadcast(ja.toString());
         }

}

what is the best way to use an autocloseable?




Reply via email to