2018-12-06 10:52:17 UTC - Olivier Chicha: @Ivan Kelly thanks for the remark, we 
effectively have a weak point here.
I think what we are going to do is to store the changes in a dedicated table of 
the DB during the transaction and then remove them once they are published.
If the service that produced the events crashes, events will be published by a 
service that will make sure at any time that all the "pending events" have been 
produced by a dedicated service that is still alive, else he will take 
reponsibility to dispatch the event himself (and then removing it from the DB).
I guess this dediated service could also consume the event sent in the topic to 
delete the corresponding entry from the DB to avoid sending twice the same 
event.
Would you have a simplier way to suggest? I guess pulsar will soon be able to 
use MySql as a source and that it could be another way of doing it.
----
2018-12-06 12:42:51 UTC - Sijie Guo: > will rackAwarePlacement policy alone 
will help?

rack aware placement policy will help

> Will this be a problem that ELB-B is not part of cluster metadata?

it is typically not a problem. ELB is more a “bootstrap” domain name. it just 
tells the clients where to find the brokers.
----
2018-12-06 16:30:42 UTC - Karthik Palanivelu: Sure @Sijie Guo Will test and 
post the results. Can you please let me know how I calibrate the bookies DC 
when launching it or please let me know how bookies identify themselves they 
belong to diff DC/Rack?
----
2018-12-06 16:57:59 UTC - Shalin: Could you tell me how I could access the 
`msg` from the context. Looks like only the `message_id` and `topic` is being 
set in the `context`.
----
2018-12-06 17:01:29 UTC - Sanjeev Kulkarni: Use the getCurrentRecord interface 
to access the record and then use the records getProperties to get the 
properties 
----
2018-12-06 17:08:53 UTC - Shalin: Ah, should have mentioned I am using the 
python sdk. Doesn't look like it is implemented in `contextimpl.py`
----
2018-12-06 17:11:47 UTC - Sanjeev Kulkarni: Usually java api is a little ahead. 
Could you please file an issue and we’ll get that fixed right away
----
2018-12-06 17:12:43 UTC - Shalin: :thumbsup_all: Sure thing
----
2018-12-06 18:38:21 UTC - lionel yony: @lionel yony has joined the channel
----
2018-12-06 18:51:07 UTC - Finn Neuik: @Finn Neuik has joined the channel
----
2018-12-06 19:17:35 UTC - Mike Card: @Mike Card has joined the channel
----
2018-12-06 19:23:05 UTC - Mike Card: Hey everyone. I am playing around a bit 
with Pulsar and I have run into a bizarre limitation. It seems that in my test 
code after I send 9 messages, the producer only sends the first 64 bytes of all 
following messages unless I wait for a minute or so before sending more. Is 
there some kind of rate limiting setting on the brokers or producer that I need 
to adjust? It is as though there is some kind of byte throughput limit that is 
set to a very low number. Just wondered if anyone else had seen this. I am 
using 2.2.0
----
2018-12-06 19:25:20 UTC - Matteo Merli: that doesn’t look anything we’ve ever 
seen.. can you share the producer code?

Also, for baseline, use `pulsar-perf produce my-topic --rate 1000` to check 
that everything is working as expected
----
2018-12-06 20:26:11 UTC - durga: Hi all - Would pulsar golang client support 
wildcard topic subscription in pulsar 2.2?
----
2018-12-06 20:33:20 UTC - Matteo Merli: @durga yes, it’s already available in 
2.2
----
2018-12-06 20:53:30 UTC - durga: thanks @Matteo Merli
----
2018-12-06 22:34:06 UTC - Mike Card: @Matteo Merli yes I can. The producer in 
question is created like this:
```
                String pulsarUrl = 
eventProducerConfiguration.getBootstrapServers();
                client = PulsarClient.builder()
                    .serviceUrl(pulsarUrl)
                    .build();
```
<snip>
```
                // Set up Pulsar producers and consumers
                try {
                    eventProducer = client.newProducer()
                        .topic(FULL_MASTER_QUEUE_TOPIC_NAME)
                        
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                        .enableBatching(true)
                        .create();
                } catch (Throwable t) {
                    _log.error("DefaultDatabus.constructor: caught exception " 
+ t.getClass().getName() + " creating eventProducer to write to " + 
FULL_MASTER_QUEUE_TOPIC_NAME + ", message = " + t.getMessage());
                }
```

and then it is invoked like this:
```
                    if (eventProducer != null && ref != null) {
                        
eventProducer.send(UpdateRefSerializer.toByteBuffer(ref).array());
```
so all this seems pretty pedestrian to me, and under light load it all works
----
2018-12-06 22:37:40 UTC - Grant Wu: Tip: delimit your code with triple 
backticks in order to get monospace formatting
----
2018-12-06 22:38:08 UTC - Mike Card: @Matteo Merli Now the consumer that reads 
this data is run from a thread pool and it is created like this:

```
    private void submitFanoutServiceTask(int taskNumber) {
        _kafkaFanoutService.submit(new Runnable() {
            @Override
            public void run() {
                doKafkaFanout(taskNumber, shutdownAllFanoutTasks);
            }
        });
    }
```

<NOTE: I did not refactor my code here so some things refer to Kafka where I 
am plugging in Pulsar in its place>

The method run by the thread is like this:

```
    /**
     * Task to pull events off master-queue topic and send them to 
resolver-queue topic with their list of subscriptions attached
     */
    private void doKafkaFanout(int taskNumber, MutableBoolean shutdownFlag) {

        try (Timer.Context ignored1 = _pulsarFanoutTimer.time()) {

            
<http://_log.info|_log.info>("DefaultDatabus.doKafkaFanout-"+taskNumber+": 
created fannedOutEventProducer...");

            // Set up Pulsar consumer
            try (Consumer eventConsumer = 
client.newConsumer().topic(FULL_MASTER_QUEUE_TOPIC_NAME).subscriptionName("master-queue-subscription").subscriptionType(SubscriptionType.Shared).subscribe())
 {

                
<http://_log.info|_log.info>("DefaultDatabus.doKafkaFanout-"+taskNumber+": 
created eventConsumer...");

                // Poll master topic to see if there are update events
                while (shutdownFlag.isFalse()) {

                    // Get available events off the master queue topic
                    
<http://_log.info|_log.info>("DefaultDatabus.doKafkaFanout-"+taskNumber+": 
polling for master-queue topic entries...");
                    Message polledEvent = eventConsumer.receive();

                    try {

                        UpdateRef ref = 
UpdateRefSerializer.fromByteBuffer(ByteBuffer.wrap(polledEvent.getData()));
```
----
2018-12-06 22:39:23 UTC - Mike Card: @Grant Wu :+1::skin-tone-2:yeah forgot to 
include MD, we use HipChat :face_vomiting:so I'm not used to having that!
----
2018-12-06 22:45:53 UTC - Mike Card: @Matteo Merli Now what I see happening is 
that the last line in the doKafkaFanout() snippet above fails because 
deserializer only gets 64 bytes so it can't deserialize the object and it 
throws an exception. Under light load this all works fine, but when you push it 
just a little you begin seeing this error. My code seems extremely simple, so I 
wasn't expecting to hit an issue like this but maybe I have something set 
wrong. My Pulsar cluster has 3 EC2 instances running in us-east-1a, 1b and 1c, 
all identical i3.4xlarge with a dedicated ZK ensemble also running in AWS. 
Thought I would run all this by the community just to see if anyone noticed 
anything or thought of some setting that may be impacting what I am doing.
----
2018-12-06 23:08:05 UTC - Mike Card: @Matteo Merli whoa I think I see what my 
problems might be, ran the command you suggested on my Pulsar cluster instance 
sitting in us-east-1a and got this result:
----
2018-12-06 23:09:49 UTC - Mike Card: @Matteo Merli wow these latencies are off 
the charts, it's no wonder I can't do load testing. Now the question is what 
happens when your write rate is way above the rate that these latencies 
suggest, which looks to be between 1 and 2 Hz?
----
2018-12-06 23:10:36 UTC - Matteo Merli: Where are you publishing from? Same 
region or outside ?
----
2018-12-06 23:11:27 UTC - Mike Card: This is being published from 1 of the 3 
instances in my pulsar cluster
----
2018-12-06 23:11:30 UTC - Mike Card: so inside
----
2018-12-06 23:12:11 UTC - Matteo Merli: i3.x4xlarge should have NVMe disks, 
right?
----
2018-12-06 23:12:19 UTC - Mike Card: yes
----
2018-12-06 23:12:29 UTC - Matteo Merli: that latency doesn’t make sense then 
:confused:
----
2018-12-06 23:12:47 UTC - Mike Card: now the other 2 instances in the cluster 
are in us-east-1b and us-east-1c
----
2018-12-06 23:13:08 UTC - Mike Card: if that would effect this
----
2018-12-06 23:13:14 UTC - Matteo Merli: Can you verify you’re actually writing 
to that volume ?
----
2018-12-06 23:14:15 UTC - Mike Card: maybe, how would I do that? Is there some 
bookeeper cli I could run to check a topic file?
----
2018-12-06 23:14:38 UTC - Matteo Merli: you can check `iostat -xm 1` while 
publishing
----
2018-12-06 23:14:52 UTC - Matteo Merli: that will show the IO breakdown across 
the disks
----
2018-12-06 23:18:17 UTC - Mike Card: ```
Device:         rrqm/s   wrqm/s     r/s     w/s    rMB/s    wMB/s avgrq-sz 
avgqu-sz   await r_await w_await  svctm  %util
xvda              0.00     0.00    0.00 1071.00     0.00     3.56     6.80     
0.69    0.66    0.00    0.66   0.50  54.00
xvdf              0.00     0.00    0.00    0.00     0.00     0.00     0.00     
0.00    0.00    0.00    0.00   0.00   0.00
nvme0n1           0.00     0.00    0.00    0.00     0.00     0.00     0.00     
0.00    0.00    0.00    0.00   0.00   0.00
nvme1n1           0.00     0.00    0.00    0.00     0.00     0.00     0.00     
0.00    0.00    0.00    0.00   0.00   0.00
```
----
2018-12-06 23:18:26 UTC - Mike Card: xvda getting 100s of writes/sec
----
2018-12-06 23:18:56 UTC - Matteo Merli: doesn’t look like `nvme0n1` or 
`nvme1n1` are getting writes
----
2018-12-06 23:19:15 UTC - Matteo Merli: did you format and mount those 2 ?
----
2018-12-06 23:19:38 UTC - Mike Card: I will have to go back and look at the 
userdata
----
2018-12-06 23:19:50 UTC - Matteo Merli: `xvda` is the local instance disk which 
is very slow (and small)
----
2018-12-06 23:20:12 UTC - Mike Card: That is where I bet this is writing to
----
2018-12-06 23:20:29 UTC - Mike Card: Would explain the poor latency
----
2018-12-06 23:21:34 UTC - Mike Card: Now the question is if I got the nvnme 
disks mounted can I shut down the brokers and change a setting for them to use 
those disks and start them up again?
----
2018-12-06 23:22:27 UTC - Matteo Merli: Are you running brokers and bookies in 
same process configuration?
----
2018-12-06 23:23:39 UTC - Mike Card: well I set this up per the quick start so 
I started up BK first then Pulsar, they are in 2 separate processes I believe
----
2018-12-06 23:25:01 UTC - Matteo Merli: The only issues to be careful with is 
that BookKeeper has a cookie mechanism to prevent config errors..

So if you just restart the bookies pointing to the new disks, it will complain 
that the same bookie was already registered in ZooKeeper and there’s a mismatch 
with what this bookie is supposed to have on disk
----
2018-12-06 23:25:47 UTC - Mike Card: Oh yes I have run into that before, which 
I think means I will have to tear down the entire Pulsar cluster and rebuild it 
from scratch
----
2018-12-06 23:25:48 UTC - Matteo Merli: You can fix it by running on each node:

```
bin/bookkeeper shell bookieformat -deleteCookie
```
----
2018-12-06 23:25:57 UTC - Mike Card: oh nice! OK
----
2018-12-06 23:26:43 UTC - Mike Card: so I could delete the cookie, shut down BK 
fix the conf so journalDir points to the new disk and start it up again?
----
2018-12-06 23:27:02 UTC - Matteo Merli: Correct
----
2018-12-06 23:27:29 UTC - Matteo Merli: I’d suggest to configure 1 disk for 
journal and the other for ledger storage
----
2018-12-06 23:27:35 UTC - Mike Card: OK. I assume before deleting the cookie 
and shutting down BK I should shut down the broker on the node?
----
2018-12-06 23:28:27 UTC - Matteo Merli: ```
journalDirectory=/mnt/disk-1
ledgerDirectories=/mnt/disk-2
```
----
2018-12-06 23:28:52 UTC - Matteo Merli: yes, change that while the process is 
stopped
----
2018-12-06 23:30:14 UTC - Mike Card: OK I'll do this Matteo and let you know 
what I observe after fixing it, thanks so much for your help!
----
2018-12-06 23:30:23 UTC - Matteo Merli: :+1:
----
2018-12-07 08:25:20 UTC - Mithun Shetty: @Mithun Shetty has joined the channel
----

Reply via email to