BewareMyPower commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768118453
I've tested in my local environment, it works well. Here're my detail steps:
1. Clone the latest pulsar repository.
2. Change `SampleConsumer.ccand `SampleProducer.cc` under
`pulsar-client-cpp/examples` to
```c++
// SampleProducer.cc (ignore the license header...)
#include <pulsar/Client.h>
#include <lib/LogUtils.h>
DECLARE_LOG_OBJECT()
using namespace pulsar;
int main() {
Client client("pulsar://localhost:6650");
Producer producer;
Result result = client.createProducer("xyz-topic", producer);
if (result != ResultOk) {
LOG_ERROR("Error creating producer: " << result);
return -1;
}
// Send synchronously
const std::string key = "my-key";
for (int i = 0; i < 10; i++) {
const auto msg = MessageBuilder().setContent("msg-" +
std::to_string(i)).setPartitionKey(key).build();
MessageId id;
result = producer.send(msg, id);
if (result == ResultOk) {
LOG_INFO("Send " << msg << " to " << id);
} else {
LOG_ERROR("Failed to send " << msg << ": " << result);
break;
}
}
client.close();
return 0;
}
```
```c++
#include <pulsar/Client.h>
#include <lib/LogUtils.h>
DECLARE_LOG_OBJECT()
using namespace pulsar;
int main(int argc, char* argv[]) {
const std::string subName = (argc > 1) ? argv[1] : "my-sub";
Client client("pulsar://localhost:6650");
Consumer consumer;
ConsumerConfiguration conf;
conf.setReadCompacted(true);
conf.setSubscriptionInitialPosition(InitialPositionEarliest);
Result result = client.subscribe("xyz-topic", subName, conf, consumer);
if (result != ResultOk) {
LOG_ERROR("Failed to subscribe: " << result);
return -1;
}
Message msg;
while (true) {
Result result = consumer.receive(msg, 1000);
if (result == ResultTimeout) {
break;
}
if (result != ResultOk) {
LOG_ERROR("Failed to receive: " << result);
return 1;
}
LOG_INFO("Receive: " << msg.getPartitionKey() << " => " <<
msg.getDataAsString() << " from "
<< msg.getMessageId());
consumer.acknowledge(msg);
}
client.close();
return 0;
}
```
As you can see, the `SampleProducer` send 10 messages (`msg-0` to `msg-9`)
with the same key `my-key` to topic `xyz-topic`. And the `SampleConsumer` try
to consume all messages from the initial position.
3. Build the pulsar using `mvn clean install -DskipTests -Pcore-modules`.
4. Run a test standalone service by running
`pulsar-client-cpp/pulsar-test-service-start.sh`.
5. Build the C++ client using `cmake`.
6. Run `./examples/SampleProducer`.
```
2021-01-27 16:21:28.153 INFO [0x118964dc0] SampleProducer:41 | Send
Message(prod=standalone-0-0, seq=0, publish_time=1611735688045, payload_size=5,
msg_id=(-1,-1,-1,-1), props={}) to (0,0,-1,0)
2021-01-27 16:21:28.158 INFO [0x118964dc0] SampleProducer:41 | Send
Message(prod=standalone-0-0, seq=1, publish_time=1611735688153, payload_size=5,
msg_id=(-1,-1,-1,-1), props={}) to (0,1,-1,0)
2021-01-27 16:21:28.164 INFO [0x118964dc0] SampleProducer:41 | Send
Message(prod=standalone-0-0, seq=2, publish_time=1611735688158, payload_size=5,
msg_id=(-1,-1,-1,-1), props={}) to (0,2,-1,0)
2021-01-27 16:21:28.168 INFO [0x118964dc0] SampleProducer:41 | Send
Message(prod=standalone-0-0, seq=3, publish_time=1611735688164, payload_size=5,
msg_id=(-1,-1,-1,-1), props={}) to (0,3,-1,0)
2021-01-27 16:21:28.175 INFO [0x118964dc0] SampleProducer:41 | Send
Message(prod=standalone-0-0, seq=4, publish_time=1611735688168, payload_size=5,
msg_id=(-1,-1,-1,-1), props={}) to (0,4,-1,0)
2021-01-27 16:21:28.180 INFO [0x118964dc0] SampleProducer:41 | Send
Message(prod=standalone-0-0, seq=5, publish_time=1611735688175, payload_size=5,
msg_id=(-1,-1,-1,-1), props={}) to (0,5,-1,0)
2021-01-27 16:21:28.185 INFO [0x118964dc0] SampleProducer:41 | Send
Message(prod=standalone-0-0, seq=6, publish_time=1611735688180, payload_size=5,
msg_id=(-1,-1,-1,-1), props={}) to (0,6,-1,0)
2021-01-27 16:21:28.191 INFO [0x118964dc0] SampleProducer:41 | Send
Message(prod=standalone-0-0, seq=7, publish_time=1611735688185, payload_size=5,
msg_id=(-1,-1,-1,-1), props={}) to (0,7,-1,0)
2021-01-27 16:21:28.195 INFO [0x118964dc0] SampleProducer:41 | Send
Message(prod=standalone-0-0, seq=8, publish_time=1611735688191, payload_size=5,
msg_id=(-1,-1,-1,-1), props={}) to (0,8,-1,0)
2021-01-27 16:21:28.200 INFO [0x118964dc0] SampleProducer:41 | Send
Message(prod=standalone-0-0, seq=9, publish_time=1611735688195, payload_size=5,
msg_id=(-1,-1,-1,-1), props={}) to (0,9,-1,0)
```
8. Run `./examples/SampleConsumer`
```
2021-01-27 16:21:31.886 INFO [0x102fd1dc0] SampleConsumer:49 | Receive:
my-key => msg-0 from (0,0,-1,0)
2021-01-27 16:21:31.887 INFO [0x102fd1dc0] SampleConsumer:49 | Receive:
my-key => msg-1 from (0,1,-1,0)
2021-01-27 16:21:31.887 INFO [0x102fd1dc0] SampleConsumer:49 | Receive:
my-key => msg-2 from (0,2,-1,0)
2021-01-27 16:21:31.887 INFO [0x102fd1dc0] SampleConsumer:49 | Receive:
my-key => msg-3 from (0,3,-1,0)
2021-01-27 16:21:31.887 INFO [0x102fd1dc0] SampleConsumer:49 | Receive:
my-key => msg-4 from (0,4,-1,0)
2021-01-27 16:21:31.887 INFO [0x102fd1dc0] SampleConsumer:49 | Receive:
my-key => msg-5 from (0,5,-1,0)
2021-01-27 16:21:31.887 INFO [0x102fd1dc0] SampleConsumer:49 | Receive:
my-key => msg-6 from (0,6,-1,0)
2021-01-27 16:21:31.887 INFO [0x102fd1dc0] SampleConsumer:49 | Receive:
my-key => msg-7 from (0,7,-1,0)
2021-01-27 16:21:31.887 INFO [0x102fd1dc0] SampleConsumer:49 | Receive:
my-key => msg-8 from (0,8,-1,0)
2021-01-27 16:21:31.887 INFO [0x102fd1dc0] SampleConsumer:49 | Receive:
my-key => msg-9 from (0,9,-1,0)
```
9. Go to the project directory and run `bin/pulsar-admin topics compact
xyz-topic` to compact this topic.
10. Go back to C++ client build directory and run `./examples/SampleConsumer
new-sub`
```
2021-01-27 16:21:46.097 INFO [0x700005d52000] ConsumerImpl:216 |
[persistent://public/default/xyz-topic, new-sub, 0] Created consumer on broker
[127.0.0.1:54843 -> 127.0.0.1:6650]
2021-01-27 16:21:46.114 INFO [0x10ce3ddc0] SampleConsumer:49 | Receive:
my-key => msg-9 from (0,9,-1,0)
2021-01-27 16:21:47.116 INFO [0x10ce3ddc0] ClientImpl:480 | Closing Pulsar
client
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]