2020-08-24 09:17:28 UTC - Takahiro Hozumi: Thank you for your advice!
----
2020-08-24 12:51:48 UTC - Frank Kelly: @Ali Ahmed Thank you!
----
2020-08-24 13:16:08 UTC - Jennifer Huang: @Jennifer Huang set the channel
topic: Breaking news: Apache Pulsar has 300 contributors now, thank you all for
your contribution to Pulsar community
<https://streamnative.io/blog/tech/2020-08-24-pulsar-300-contributors>
star-struck : Frank Kelly, Ali Ahmed
----
2020-08-24 14:37:40 UTC - Nazia Firdous: As we know there are 3
<http://subscriptions.In|subscriptions.In> Failover Subscription When the
master consumer disconnects, all (non-acknowledged and subsequent) messages are
delivered to the next consumer..I can able to do Exclusive and shared
subscription..But i don't know how to achieve Failover..Below are my Code
Kindly guide me how i can disconnect the first consumer so Next consumer will
get all the messages.
```class PulsarFailoverSubscription {
public static void main(String[] args) {
try {
boolean exit;
// PulsarClient pc =
PulsarClient.builder().serviceUrl("<pulsar://67.160.195.238:6650>").build();
PulsarClient pc =
PulsarClient.builder().serviceUrl("<pulsar://67.160.195.238:6618>").build();
PulsarFailoverSubscription psc = new PulsarFailoverSubscription();
psc.producer1(pc, "Producer1");
psc.consumer1(pc, "Consumer1");
psc.consumer2(pc, "Consumer2");
} catch (PulsarClientException e) {
e.getMessage();
}
System.out.println("Exiting out of Main");
}//catch
private void producer1(PulsarClient pc, String id) {
final Thread thread = new Thread(()-> {
try {
Producer<String> producer =
pc.newProducer(Schema.STRING)
.topic("<persistent://public/default/test-topic1>")
.create();
for (int i = 0; i <= 10; i++) {
try {
Thread.sleep(1000);
}//try
catch (InterruptedException e) {
System.out.println("Thread Interrupted
"+e.getMessage());
}//catch
producer.sendAsync("Message from " +id+" : "+ i);
} //for
}//try
catch (Exception e) {
e.getMessage();
}//catch
});thread.start();
}
public void consumer1(PulsarClient pc, String id) {
final Thread thread1 = new Thread(() -> {
try {
Consumer<String> consumer1 = pc.newConsumer(Schema.STRING)
.topic(new String[]{"test-topic1"})
.subscriptionType(SubscriptionType.Failover)
.subscriptionName("test-subscription1")
.subscribe();
while (true) {
Message<String> msg = consumer1.receive();
System.out.println(id + " Received " + new
String(msg.getData()));
}
}//try
catch (Exception e) {
e.getMessage();
}//catch
});thread1.start();
thread1.interrupt();
}
public void consumer2(PulsarClient pc, String id) {
final Thread thread2 = new Thread(() -> {
try {
Consumer<String> consumer2 = pc.newConsumer(Schema.STRING)
.topic(new String[]{"test-topic1"})
.subscriptionType(SubscriptionType.Failover)
.subscriptionName("test-subscription1")
.subscribe();
while (true) {
Message<String> msg = consumer2.receive();
System.out.println(id + " Received " + new
String(msg.getData()));
}
}//try
catch (Exception e) {
e.getMessage();
}//catch
});thread2.start();
}
}```
----
2020-08-24 14:55:53 UTC - Matt Mitchell: @Addison Higham I don’t see errors in
the broker logs, but it’s possible that Pulsar was redeployed at some point
after the problem. If it happens again, I’ll be sure to check the broker logs
again though.
----
2020-08-24 15:28:26 UTC - Addison Higham: Failover subscriptions are usually
used on two different processes (usually on different machines) as the main
purpose is to be used one a machine/process fails and you want to minimize
latency.
You could simulate this in a single process, but the easiest way to do so will
be to use two different clients, each with a consumer. Then you can disconnect
one client and have the other take over
----
2020-08-24 15:32:36 UTC - Nazia Firdous: Okay Thank you i will try this..
----
2020-08-24 15:45:53 UTC - Nathan Mills: What benefits does setting a backlog
quota policy and/or TTL in addition to a Retention policy provide?
----
2020-08-24 15:53:18 UTC - Addison Higham: it can still be desired to put
reasonable limits on subscription backlog that a consumer may just never
reasonably catch up from, which is where a backlog quota is useful. As an
example, I store my messages for 30 days for Pulsar SQL queries, but in
reality, my apps are never going to catch up to more than a few days of backlog.
Similarly, you may want to retain messages longer, but within a subscription,
after X hours, maybe it doesn't make sense to have your app notified.
----
2020-08-24 15:58:30 UTC - Nathan Mills: gotcha, does it have any performance
impact on the cluster?
----
2020-08-24 16:05:40 UTC - Addison Higham: enabling or disabling them? not that
I am aware of. I guess having them disabled would be "one less thing" it has
to check for, but it should be inconsequential
----
2020-08-24 16:52:07 UTC - Ruian: Hi, I wonder how message id (ledger id, entry
id, batch id) is generated? Could I know it in advance without asking pulsar
broker?
That’s say I have a producer sending messages in batches to a topic
partitioned. And I have a consumer which knows the earliest message id of the
partition is A. Could I generate a message id B from A so that I can use
`consumer.seek(B)` to skip particular number of messages?
----
2020-08-24 17:04:35 UTC - Addison Higham: This is tricky in the general case.
`batchId` is simple, it is just the index of the messages in the batch. So if
you only wanted to skip within batches, that might be doable.
However, if the number of messages you want to skip span entries, that might be
doable if you only had a single producer, but even then, I don't believe
entryId is guaranteed to increase just a single counter. LedgerId would be
worse as it is quite unpredictable
What you might try instead is sequenceIDs
If you have a way of generating predictable sequence ids, you could possibly
use them. You wouldn't be able to seek, but you could filter the messages based
on sequenceId
----
2020-08-24 17:37:08 UTC - Ruian: It seems that it is impossible to pre
calculate the message id.
Actually I am writing my own spark connector, and trying to limit the size of
the first batch of spark’s micro batch reader.
Anyway, thank you for your replying!
----
2020-08-24 18:39:14 UTC - Joshua Decosta: Noob question here, do i need to
explicitly create the topics that functions use as input and output when
creating a function with the pulsar-admin client?
----
2020-08-24 18:47:37 UTC - Stepan Mazurov: @Stepan Mazurov has joined the channel
----
2020-08-24 19:03:00 UTC - Addison Higham: by default no. The functions
interface will just create a consumer/producer for any of the defined topics,
and automatic topic creation in pulsar creates topics for any producer or
consumer. You can disable automatic topi c creation with
`allowAutoTopicCreation`
----
2020-08-24 19:03:23 UTC - Joshua Decosta: I think for my situation it doesn’t
even make it work either way. My function pod keeps crashing and I’m not sure
why.
----
2020-08-24 19:03:46 UTC - Joshua Decosta: Is there some specific configs i need
to add to get this working with the default helm charts?
----
2020-08-24 19:30:46 UTC - Addison Higham: do you have `functionAsPods` enabled?
if so, do you have logs for the statefulset that is created (assuming it
creates successfully)
----
2020-08-24 19:36:35 UTC - Joshua Decosta: I don’t have that enabled but it
looks like it’s still generating pods
----
2020-08-24 20:33:10 UTC - Frank Kelly: Seeing a 500 error using Custom Authn
and AuthZ plugins (they worked in 2.5.2 but not in 2.6.1) this is just a
pulsar standalone instance
- DEBUG log is already on - is there a way to see the stack trace?
----
2020-08-24 20:33:10 UTC - Frank Kelly: ```16:27:48.573 [pulsar-web-54-14] DEBUG
com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthenticationProvider
- Authenticating token . . .
16:27:48.576 [pulsar-web-54-14] DEBUG
com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthenticationProvider
- Token authenticated
16:27:48.576 [pulsar-web-54-14] DEBUG
org.apache.pulsar.broker.web.AuthenticationFilter - [127.0.0.1] Authenticated
HTTP request with role cogito
16:27:48.576 [pulsar-web-54-14] DEBUG org.eclipse.jetty.servlet.ServletHandler
- call filter
org.apache.pulsar.broker.web.ResponseHandlerFilter-7aa01bd9@7aa01bd9==org.apache.pulsar.broker.web.ResponseHandlerFilter,inst=true,async=true
16:27:48.576 [pulsar-web-54-14] DEBUG org.eclipse.jetty.servlet.ServletHandler
- call servlet
org.glassfish.jersey.servlet.ServletContainer-640d604@f679d7ba==org.glassfish.jersey.servlet.ServletContainer,jsp=null,order=-1,inst=true,async=true
16:27:48.583 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpOutput -
write(array HeapByteBuffer@6c771e32[p=0,l=7120,c=8192,r=7120]={<<<\n
--- An unexpected
error...d.run(Thread.java:834)\n>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00})
16:27:48.583 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpOutput -
write(array) s=CLOSING,api=BLOCKED,sc=false,e=null last=true agg=false
flush=true async=false, len=7120 null
16:27:48.583 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpChannel -
sendResponse info=null
content=HeapByteBuffer@79ba1771[p=0,l=7120,c=8192,r=7120]={<<<\n ---
An unexpected
error...d.run(Thread.java:834)\n>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00}
complete=true committing=true callback=Blocker@32faa6a7{null}
16:27:48.584 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpChannel -
COMMIT for /admin/v2/namespaces/public on
HttpChannelOverHttp@397aa579{s=HttpChannelState@3535581a{s=HANDLING rs=BLOCKING
os=COMMITTED is=IDLE awp=false se=false i=true
al=0},r=6,c=false/false,a=HANDLING,uri=//localhost:8080/admin/v2/namespaces/public,age=11}
500 Internal Server Error HTTP/1.1
Date: Mon, 24 Aug 2020 20:27:48 GMT
Content-Length: 7120
Content-Type: text/plain
16:27:48.584 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpConnection -
generate: NEED_HEADER for
org.eclipse.jetty.server.HttpConnection$SendCallback@58eeee2d[PROCESSING][i=HTTP/1.1{s=500,h=3,cl=7120},cb=org.eclipse.jetty.server.HttpChannel$SendCallback@4096f060]
(null,[p=0,l=7120,c=8192,r=7120],true)@START
16:27:48.584 [pulsar-web-54-14] DEBUG org.eclipse.jetty.http.HttpGenerator -
generateHeaders HTTP/1.1{s=500,h=3,cl=7120} last=true
content=HeapByteBuffer@79ba1771[p=0,l=7120,c=8192,r=7120]={<<<\n ---
An unexpected
error...d.run(Thread.java:834)\n>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00}```
----
2020-08-24 20:55:01 UTC - Frank Kelly: Figured it out
`org.apache.pulsar.broker.authorization.AuthorizationProvider` provides a bunch
of default implementation methods which basically throw an exception. My guess
is they should NOT be default implementations?
<https://github.com/apache/pulsar/blob/48f5a2f62c148b3df617be060fefed51f3145979/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java#L230-L269>
----
2020-08-24 20:55:32 UTC - Frank Kelly: I will file an issue on GitHub tomorrow
----
2020-08-24 21:55:54 UTC - Evan Furman: Hi guys, we’re looking to take
adavantage of rack awareness for the bookies. Is there any reason to use
`bookkeeperClientRackawarePolicyEnabled` rather than
`bookkeeperClientRegionawarePolicyEnabled` ? We’re running in two AZs in AWS.
----
2020-08-24 23:37:06 UTC - Addison Higham: This documentation (for another
component that uses bookkeeper but is still applicable)
<https://bookkeeper.apache.org/distributedlog/docs/latest/user_guide/implementation/storage.html#id8>
gives an overview of the differences between the two.
RegionAware should mostly be the same as Rackaware, but simply adds another
level of hierarchy to ensure it spreads across regions first and then can
spread across racks.
Specifcally:
> RackAware placement policy basically just chooses bookies from different
racks in the built network topology. It guarantees that a write quorum will
cover at least two racks.
> RegionAware placement policy is a hierarchical placement policy, which it
chooses equal-sized bookies from regions, and within each region it uses
RackAware placement policy to choose bookies from racks. For example, if there
is 3 regions - region-a, region-b and region-c, an application want to allocate
a 15-bookies ensemble. First, it would figure out there are 3 regions and it
should allocate 5 bookies from each region. Second, for each region, it would
use RackAware placement policy to choose 5 bookies.
----
2020-08-25 04:50:03 UTC - Tymm: Hello, is it possible to subscribe to a list of
pulsar topic using websocket api?
----
2020-08-25 08:13:47 UTC - Evion Cane: @Evion Cane has joined the channel
----
2020-08-25 08:58:47 UTC - Vil: I am not sure but docs do not include topic
listing <https://pulsar.apache.org/docs/en/client-libraries-websocket/>. But
perhaps docs have not all the info?
----