2020-08-24 09:17:28 UTC - Takahiro Hozumi: Thank you for your advice!
----
2020-08-24 12:51:48 UTC - Frank Kelly: @Ali Ahmed Thank you!
----
2020-08-24 13:16:08 UTC - Jennifer Huang: @Jennifer Huang set the channel 
topic: Breaking news: Apache Pulsar has 300 contributors now, thank you all for 
your contribution to Pulsar community 
<https://streamnative.io/blog/tech/2020-08-24-pulsar-300-contributors>
star-struck : Frank Kelly, Ali Ahmed
----
2020-08-24 14:37:40 UTC - Nazia Firdous: As we know there are 3 
<http://subscriptions.In|subscriptions.In> Failover Subscription When the 
master consumer disconnects, all (non-acknowledged and subsequent) messages are 
delivered to the next consumer..I can able to do Exclusive and shared 
subscription..But i don't know how to achieve Failover..Below are my Code 
Kindly guide me how i can disconnect the first consumer so Next consumer will 
get all the messages.
```class PulsarFailoverSubscription {
    public static void main(String[] args) {
        try {
             boolean exit;
            // PulsarClient pc = 
PulsarClient.builder().serviceUrl("<pulsar://67.160.195.238:6650>").build();
            PulsarClient pc = 
PulsarClient.builder().serviceUrl("<pulsar://67.160.195.238:6618>").build();
            PulsarFailoverSubscription psc = new PulsarFailoverSubscription();
            psc.producer1(pc, "Producer1");
            psc.consumer1(pc, "Consumer1");
            psc.consumer2(pc, "Consumer2");
        } catch (PulsarClientException e) {
            e.getMessage();
        }
        System.out.println("Exiting out of Main");
    }//catch


    private void producer1(PulsarClient pc, String id) {
        final Thread thread = new Thread(()-&gt; {
                try {
                    Producer&lt;String&gt; producer = 
pc.newProducer(Schema.STRING)
                            .topic("<persistent://public/default/test-topic1>")
                            .create();
                    for (int i = 0; i &lt;= 10; i++) {
                        try {
                            Thread.sleep(1000);
                        }//try
                        catch (InterruptedException e) {
                            System.out.println("Thread Interrupted 
"+e.getMessage());
                        }//catch
                        producer.sendAsync("Message from " +id+" : "+ i);
                    } //for

                }//try
                catch (Exception e) {
                    e.getMessage();
                }//catch

        });thread.start();
    }



    public void consumer1(PulsarClient pc, String id) {
        final Thread thread1 = new Thread(() -&gt; {
            try {
                Consumer&lt;String&gt; consumer1 = pc.newConsumer(Schema.STRING)
                        .topic(new String[]{"test-topic1"})
                        .subscriptionType(SubscriptionType.Failover)
                        .subscriptionName("test-subscription1")
                        .subscribe();

                while (true) {
                    Message&lt;String&gt; msg = consumer1.receive();
                    System.out.println(id + " Received " + new 
String(msg.getData()));
                }
            }//try

            catch (Exception e) {
                e.getMessage();
            }//catch
        });thread1.start();
        thread1.interrupt();
    }

    public void consumer2(PulsarClient pc, String id) {
        final Thread thread2 = new Thread(() -&gt; {

            try {
                Consumer&lt;String&gt; consumer2 = pc.newConsumer(Schema.STRING)
                        .topic(new String[]{"test-topic1"})
                        .subscriptionType(SubscriptionType.Failover)
                        .subscriptionName("test-subscription1")
                        .subscribe();
                while (true) {
                    Message&lt;String&gt; msg = consumer2.receive();
                    System.out.println(id + " Received " + new 
String(msg.getData()));
                }
            }//try

            catch (Exception e) {
                e.getMessage();
            }//catch
        });thread2.start();
    }
}```

----
2020-08-24 14:55:53 UTC - Matt Mitchell: @Addison Higham I don’t see errors in 
the broker logs, but it’s possible that Pulsar was redeployed at some point 
after the problem. If it happens again, I’ll be sure to check the broker logs 
again though.
----
2020-08-24 15:28:26 UTC - Addison Higham: Failover subscriptions are usually 
used on two different processes (usually on different machines) as the main 
purpose is to be used one a machine/process fails and you want to minimize 
latency.

You could simulate this in a single process, but the easiest way to do so will 
be to use two different clients, each with a consumer. Then you can disconnect 
one client and have the other take over
----
2020-08-24 15:32:36 UTC - Nazia Firdous: Okay Thank you i will try this..
----
2020-08-24 15:45:53 UTC - Nathan Mills: What benefits does setting a backlog 
quota policy and/or TTL in addition to a Retention policy provide?
----
2020-08-24 15:53:18 UTC - Addison Higham: it can still be desired to put 
reasonable limits on subscription backlog that a consumer may just never 
reasonably catch up from, which is where a backlog quota is useful. As an 
example, I store my messages for 30 days for Pulsar SQL queries, but in 
reality, my apps are never going to catch up to more than a few days of backlog.

Similarly, you may want to retain messages longer, but within a subscription, 
after X hours, maybe it doesn't make sense to have your app notified.
----
2020-08-24 15:58:30 UTC - Nathan Mills: gotcha, does it have any performance 
impact on the cluster?
----
2020-08-24 16:05:40 UTC - Addison Higham: enabling or disabling them? not that 
I am aware of.  I guess having them disabled would be "one less thing" it has 
to check for, but it should be inconsequential
----
2020-08-24 16:52:07 UTC - Ruian: Hi, I wonder how message id (ledger id, entry 
id, batch id) is generated? Could I know it in advance without asking pulsar 
broker?
That’s say I have a producer sending messages in batches to a topic 
partitioned. And I have a consumer which knows the earliest message id of the 
partition is A. Could I generate a message id B from A so that I can use 
`consumer.seek(B)` to skip particular number of messages?
----
2020-08-24 17:04:35 UTC - Addison Higham: This is tricky in the general case. 
`batchId` is simple, it is just the index of the messages in the batch. So if 
you only wanted to skip within batches, that might be doable.

However, if the number of messages you want to skip span entries, that might be 
doable if you only had a single producer, but even then, I don't believe 
entryId is guaranteed to increase just a single counter. LedgerId would be 
worse as it is quite unpredictable

What you might try instead is sequenceIDs

If you have a way of generating predictable sequence ids, you could possibly 
use them. You wouldn't be able to seek, but you could filter the messages based 
on sequenceId
----
2020-08-24 17:37:08 UTC - Ruian: It seems that it is impossible to pre 
calculate the message id.
Actually I am writing my own spark connector, and trying to limit the size of 
the first batch of spark’s micro batch reader.
Anyway, thank you for your replying!
----
2020-08-24 18:39:14 UTC - Joshua Decosta: Noob question here, do i need to 
explicitly create the topics that functions use as input and output when 
creating a function with the pulsar-admin client?
----
2020-08-24 18:47:37 UTC - Stepan Mazurov: @Stepan Mazurov has joined the channel
----
2020-08-24 19:03:00 UTC - Addison Higham: by default no. The functions 
interface will just create a consumer/producer for any of the defined topics, 
and automatic topic creation in pulsar creates topics for any producer or 
consumer. You can disable automatic topi c creation with 
`allowAutoTopicCreation`
----
2020-08-24 19:03:23 UTC - Joshua Decosta: I think for my situation it doesn’t 
even make it work either way. My function pod keeps crashing and I’m not sure 
why. 
----
2020-08-24 19:03:46 UTC - Joshua Decosta: Is there some specific configs i need 
to add to get this working with the default helm charts?
----
2020-08-24 19:30:46 UTC - Addison Higham: do you have `functionAsPods` enabled? 
if so, do you have logs for the statefulset that is created (assuming it 
creates successfully)
----
2020-08-24 19:36:35 UTC - Joshua Decosta: I don’t have that enabled but it 
looks like it’s still generating pods 
----
2020-08-24 20:33:10 UTC - Frank Kelly: Seeing a 500 error using Custom Authn 
and AuthZ plugins (they worked in 2.5.2 but not in 2.6.1)   this is just a 
pulsar standalone instance
- DEBUG log is already on - is there a way to see the stack trace?

----
2020-08-24 20:33:10 UTC - Frank Kelly: ```16:27:48.573 [pulsar-web-54-14] DEBUG 
com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthenticationProvider
 - Authenticating token . . . 
16:27:48.576 [pulsar-web-54-14] DEBUG 
com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthenticationProvider
 - Token authenticated
16:27:48.576 [pulsar-web-54-14] DEBUG 
org.apache.pulsar.broker.web.AuthenticationFilter - [127.0.0.1] Authenticated 
HTTP request with role cogito
16:27:48.576 [pulsar-web-54-14] DEBUG org.eclipse.jetty.servlet.ServletHandler 
- call filter 
org.apache.pulsar.broker.web.ResponseHandlerFilter-7aa01bd9@7aa01bd9==org.apache.pulsar.broker.web.ResponseHandlerFilter,inst=true,async=true
16:27:48.576 [pulsar-web-54-14] DEBUG org.eclipse.jetty.servlet.ServletHandler 
- call servlet 
org.glassfish.jersey.servlet.ServletContainer-640d604@f679d7ba==org.glassfish.jersey.servlet.ServletContainer,jsp=null,order=-1,inst=true,async=true
16:27:48.583 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpOutput - 
write(array HeapByteBuffer@6c771e32[p=0,l=7120,c=8192,r=7120]={&lt;&lt;&lt;\n 
--- An unexpected 
error...d.run(Thread.java:834)\n&gt;&gt;&gt;\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00})
16:27:48.583 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpOutput - 
write(array) s=CLOSING,api=BLOCKED,sc=false,e=null last=true agg=false 
flush=true async=false, len=7120 null
16:27:48.583 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpChannel - 
sendResponse info=null 
content=HeapByteBuffer@79ba1771[p=0,l=7120,c=8192,r=7120]={&lt;&lt;&lt;\n --- 
An unexpected 
error...d.run(Thread.java:834)\n&gt;&gt;&gt;\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00}
 complete=true committing=true callback=Blocker@32faa6a7{null}
16:27:48.584 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpChannel - 
COMMIT for /admin/v2/namespaces/public on 
HttpChannelOverHttp@397aa579{s=HttpChannelState@3535581a{s=HANDLING rs=BLOCKING 
os=COMMITTED is=IDLE awp=false se=false i=true 
al=0},r=6,c=false/false,a=HANDLING,uri=//localhost:8080/admin/v2/namespaces/public,age=11}
500 Internal Server Error HTTP/1.1
Date: Mon, 24 Aug 2020 20:27:48 GMT
Content-Length: 7120
Content-Type: text/plain


16:27:48.584 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpConnection - 
generate: NEED_HEADER for 
org.eclipse.jetty.server.HttpConnection$SendCallback@58eeee2d[PROCESSING][i=HTTP/1.1{s=500,h=3,cl=7120},cb=org.eclipse.jetty.server.HttpChannel$SendCallback@4096f060]
 (null,[p=0,l=7120,c=8192,r=7120],true)@START
16:27:48.584 [pulsar-web-54-14] DEBUG org.eclipse.jetty.http.HttpGenerator - 
generateHeaders HTTP/1.1{s=500,h=3,cl=7120} last=true 
content=HeapByteBuffer@79ba1771[p=0,l=7120,c=8192,r=7120]={&lt;&lt;&lt;\n --- 
An unexpected 
error...d.run(Thread.java:834)\n&gt;&gt;&gt;\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00}```
----
2020-08-24 20:55:01 UTC - Frank Kelly: Figured it out 
`org.apache.pulsar.broker.authorization.AuthorizationProvider` provides a bunch 
of default implementation methods which basically throw an exception. My guess 
is they should NOT be default implementations?
<https://github.com/apache/pulsar/blob/48f5a2f62c148b3df617be060fefed51f3145979/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java#L230-L269>
----
2020-08-24 20:55:32 UTC - Frank Kelly: I will file an issue on GitHub tomorrow
----
2020-08-24 21:55:54 UTC - Evan Furman: Hi guys, we’re looking to take 
adavantage of rack awareness for the bookies. Is there any reason to use 
`bookkeeperClientRackawarePolicyEnabled` rather than 
`bookkeeperClientRegionawarePolicyEnabled` ? We’re running in two AZs in AWS.
----
2020-08-24 23:37:06 UTC - Addison Higham: This documentation (for another 
component that uses bookkeeper but is still applicable) 
<https://bookkeeper.apache.org/distributedlog/docs/latest/user_guide/implementation/storage.html#id8>
 gives an overview of the differences between the two.

RegionAware should mostly be the same as Rackaware, but simply adds another 
level of hierarchy to ensure it spreads across regions first and then can 
spread across racks.

Specifcally:
&gt; RackAware placement policy basically just chooses bookies from different 
racks in the built network topology. It guarantees that a write quorum will 
cover at least two racks.
&gt; RegionAware placement policy is a hierarchical placement policy, which it 
chooses equal-sized bookies from regions, and within each region it uses 
RackAware placement policy to choose bookies from racks. For example, if there 
is 3 regions - region-a, region-b and region-c, an application want to allocate 
a 15-bookies ensemble. First, it would figure out there are 3 regions and it 
should allocate 5 bookies from each region. Second, for each region, it would 
use RackAware placement policy to choose 5 bookies.
----
2020-08-25 04:50:03 UTC - Tymm: Hello, is it possible to subscribe to a list of 
pulsar topic using websocket api?
----
2020-08-25 08:13:47 UTC - Evion Cane: @Evion Cane has joined the channel
----
2020-08-25 08:58:47 UTC - Vil: I am not sure but docs do not include topic 
listing <https://pulsar.apache.org/docs/en/client-libraries-websocket/>. But 
perhaps docs have not all the info?
----

Reply via email to