BewareMyPower commented on issue #8846:
URL: https://github.com/apache/pulsar/issues/8846#issuecomment-740629832
@ltamber The cost of flexibility is the complexity. It's not like
`sendAsync` vs. `send`, `receive(Message&, int)` is easy and enough to use in
most cases. The only case to use `receiveAsync` is to wait multiple messages.
If you're using Java client, when you choose `receiveAsync`, the logic will
still be complicated.
Let's start with a simple client code:
```java
try (PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
Consumer<byte[]> consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();
for (int i = 0; i < 100000/* just to represent a large number
*/; i++) {
consumer.receiveAsync().whenComplete((msg, e) -> {
if (e == null) {
// TODO: process `msg`
} else {
// TODO: handle error
}
});
}
} catch (PulsarClientException e) {
e.printStackTrace();
}
```
The issue is the same. If there're no messages, the pending receives queue
will grow infinitely until OOM.
Then you need to manage these futures, e.g.
```java
ArrayBlockingQueue<CompletableFuture<Message<byte[]>>> queue =
new ArrayBlockingQueue<>(1000);
for (int i = 0; i < 100000; i++) {
CompletableFuture<Message<byte[]>> future =
consumer.receiveAsync();
queue.put(future); // it blocks when the queue is full
future.whenComplete((msg, e) -> {
if (e == null) {
// TODO: process `msg`
} else {
// TODO: handle error
}
queue.poll(); // just to release a space
});
}
```
So it's like the first C++ example. The code looks more simple just because
`ArrayBlockingQueue` has already encapsulated the conditional variable. Also,
As for the second C++ example, instead of using `queue.put(future)`:
```java
if (!queue.offer(future)) {
// TODO: handle the queue is full error to make sure the
next queue.add won't fail
queue.add(future);
}
```
You can just wait for a while, or after #8326 , you can call `cancel` here
to remove all pending receives:
```java
if (!queue.offer(future)) {
queue.forEach(futureInQueue ->
futureInQueue.cancel(true));
queue.add(future);
}
```
The code looks more simple but it's just because Java standard library is
more strong. You still have to handle the corner cases from a client side
instead of relying on the library.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]