coryvirok opened a new issue #133:
URL: https://github.com/apache/pulsar-client-node/issues/133
First off, thanks for Pulsar and all of these client libs! I'm excited to
vet Pulsar for my company and see if it solves some of Kafka's shortcomings.
I have a gRPC service that implements a server-streaming RPC. This RPC reads
from a Pulsar topic and streams the data from the topic to the gRPC client.
This all works well until I try and gracefully shut down the gRPC server.
I'm finding that the server hangs due to the Pulsar Consumer's synchronous
`receive()` method. The only way I can get it to work is to use
`consumer.receive(timeout)` and check the exception message. Which seems fairly
non-idiomatic for Node.js and makes it difficult to handle complex, nested,
shutdown logic in a real application.
Here's a simplified example:
```js
import Pulsar from "pulsar-client";
const pulsarClient = new Pulsar.Client({
serviceUrl: "pulsar://127.0.0.1:6650",
});
async function main() {
const consumer = await pulsarClient.subscribe({
topic: `test`,
subscription: "default",
});
while (true) {
try {
// Since receive() is blocking, use this version in order to give up
control and allow
// the while loop to check if it should finish.
await consumer.receive(1000);
console.log("received message");
} catch (e) {
// We are forced to use timeout exceptions and `e.message` matching to
know when we should
// retry the call to `receive(timeout)`
if (e.message === "Failed to received message TimeOut") {
console.log("receive() timed out, retrying");
continue;
} else if (e.message === "Failed to received message AlreadyClosed") {
// We are forced to use another exception and `e.message` matching
to know when the
// consumer was closed by the SIGINT handler
console.log("consumer closed, breaking loop");
break;
}
// Some other, legit exception that we should handle somewhere but has
nothing to do with
// control flow
throw e;
}
}
// Do whatever cleanup needs to happen
console.log("CLEANING UP....");
console.log("returning from main");
}
// Trap Ctrl-C from the keyboard
// This could also be `process.on("beforeExit", () => {})` or any other
signal handler
//
// Try commenting this out and seeing if "CLEANING UP..." is logged,
(spoiler: it isn't)
process.on("SIGINT", async () => {
console.log("shutting down");
await pulsarClient.close();
});
await main();
```
I'm not a Node.js expert, but from my experience streams are usually managed
via events. This pattern allows the developer to write event-driven code
instead of imperative loops like the one above. This is helpful because it
doesn't require exception handling and message matching to determine control
flow. And it lets the developer call `consumer.close()` without worrying if the
`consumer.receive()` sync method is blocking the event loop.
I understand the desire to use async/await here and I'm a HUGE fan of the
pattern. But I think it's important for `consumer.receive()` to be async
instead of sync. Otherwise, we're left having to write these loops using the
timeout version of `receive(timeout)`.
An example of how I'd expect to be able to use the Pulsar Consumer:
```js
const consumer = await pulsarClient.subscribe({
topic: `test`,
subscription: "default",
});
consumer.on('data', (msg) => {
// handle message from consumer
});
consumer.on('end', () => {
// clean up any resources that should be cleaned up after a consumer is
closed,
// for various reasons, e.g. the Pulsar server went down, or the SIGINT
handler called
// consumer.close() or pulsarClient.close()
});
consumer.on('error', (err) => {
// Consumer is no longer usable, but it received an error... do cleanup
and report
// the issue
});
```
My question is, is this the right way to use this library? Am I missing
something or do we need to use the sync methods `consumer.receive()` and
`consumer.receive(timeout)` in order to release control to the main loop in
order to do things like shutdown the process?
Thanks in advance!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]