2020-05-18 10:08:18 UTC - Pierre-Yves Lebecq: Hello :wave:
Quick question related to Pulsar functions state, and more specifically the 
incrCounter method of the Context object. Does this method offer some kind of 
atomicity? For example, if I run many instances of the same function, is there 
a possibility of race conditions making the counter to be wrong at some point? 
The API makes me think it does ensure atomicity, however I’m having a hard time 
confirming it by looking at the pulsar or bookkeeper documentation.
eyes : Gilles Barbier
+1 : Frank Kelly
----
2020-05-18 10:12:08 UTC - JG: yes it doesnt work so how is possible to have JDK 
11 functions working on Pulsar ? is there a way to use Pulsar in JDK 11 ?
----
2020-05-18 10:26:58 UTC - Kirill Merkushev: You could try 
<https://github.com/bsideup/jabel>
----
2020-05-18 11:19:25 UTC - crtomir: @crtomir has joined the channel
----
2020-05-18 12:54:33 UTC - Andreas Müller: @Andreas Müller has joined the channel
----
2020-05-18 12:58:02 UTC - Andreas Müller: Hi, I'm looking for a fat jar of the 
pulsar java client. There is one at maven central (pulsar-client-all) but it 
lacks classes like `org.apache.pulsar.client.api.PulsarClient`. Any hints? I 
need the fat jar. Currently the only way to use the client is to add all jar 
files from the `lib` directory which is 100+ MB... Thanks, Andreas
----
2020-05-18 14:53:40 UTC - VanderChen: Hello, everyone! I have a question about 
*Key Shared Mode*.  In my system, Pulsar is running in *standalone* mode with 
*Docker*.
`docker run -it -p 6650:6650 -p 8080:8080 --mount 
source=pulsardata,target=/pulsar/data --mount 
source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.5.1 bin/pulsar 
standalone`

When I use *keySharedPolicy* to specify a key for the consumer, this consumer 
will still receive all the messages.
For example, sending messages with keys "key-1" "key-2", consumer for key-1 
will receive both key-1 message and key-2 message.
I don't know the problem is in my code or my system setting.

The demo code is as follows,
*Producer code*
```public class PulsarProducer {

    private static PulsarClient client;
    private static Producer&lt;byte[]&gt; producer;

    public static void main(String[] args) throws Exception {
        client = PulsarClient.builder()
                .serviceUrl("<pulsar://localhost:6650>")
                .build();

        producer = client.newProducer()
                .topic("my-topic")
                .create();

        startProducer();

    }

    private static void startProducer() throws Exception {
        while (true){
            System.out.println("Key Shared Message round!");

            
producer.newMessage().key("key-1").value("message-1-1\n".getBytes()).send();
            
producer.newMessage().key("key-1").value("message-1-2\n".getBytes()).send();
            
producer.newMessage().key("key-1").value("message-1-3\n".getBytes()).send();
            
producer.newMessage().key("key-2").value("message-2-1\n".getBytes()).send();
            
producer.newMessage().key("key-2").value("message-2-2\n".getBytes()).send();
            
producer.newMessage().key("key-2").value("message-2-3\n".getBytes()).send();
            
producer.newMessage().key("key-3").value("message-3-1\n".getBytes()).send();
            
producer.newMessage().key("key-3").value("message-3-2\n".getBytes()).send();
            
producer.newMessage().key("key-4").value("message-4-1\n".getBytes()).send();
            
producer.newMessage().key("key-4").value("message-4-2\n".getBytes()).send();

            Thread.sleep(1000);
        }

    }
}```
*The Consumer code*
```public class PulsarConsumers {

    private static PulsarClient client;
    private static Consumer&lt;byte[]&gt; consumer;

    public static void main(String[] args) throws Exception {
        client = PulsarClient.builder()
                .serviceUrl("<pulsar://localhost:6650>")
                .build();

        int hashcode = Murmur3_32Hash.getInstance().makeHash("key") % 65536;

        consumer = client.newConsumer()
                .topic("my-topic")
                .ackTimeout(30, TimeUnit.SECONDS)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Key_Shared)
                
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hashcode, 
hashcode)))
                .subscribe();

        startConsumer();

    }

    private static void startConsumer() throws PulsarClientException {

        while (true) {

            // Wait for a message
            Message&lt;byte[]&gt; msg = consumer.receive();
            try {
                System.out.printf("Message received: %s", new 
String(msg.getData()));
                consumer.acknowledge(msg);
            } catch (Exception e) {
                System.err.printf("Unable to consume message: %s", 
e.getMessage());
                consumer.negativeAcknowledge(msg);
            }
        }
    }
}```
----
2020-05-18 14:54:50 UTC - Matteo Merli: It just depends on `pulsar-client-api` 
too. This was done to ensure you can get Javadoc, and source code explore in 
IDEs
----
2020-05-18 16:05:29 UTC - Andreas Müller: Unfortunately it has more 
dependencies like protobuf. I’m using client-all and pulsar-api and still 
getting this:
----
2020-05-18 16:14:08 UTC - Andreas Müller: I had to add `org.lz4-lz4-java-1.5.0` 
and `org.apache.pulsar-protobuf-shaded-2.1.0-incubating` plus the 2 above. 
Actually not what I would call a shaded jar. Everything should be included in 
`client-all`.
----
2020-05-18 17:04:09 UTC - Sijie Guo: Currently the state was a “global” state. 
The state is hosted at bookkeeper side. It ensures all the updates are 
sequenced into a log before mutating the state. So it prevent race conditions.
----
2020-05-18 17:06:06 UTC - Sijie Guo: You are computing a different hashcode?

```int hashcode = Murmur3_32Hash.getInstance().makeHash("key") % 65536;```
----
2020-05-18 17:08:47 UTC - Sijie Guo: @JG we have to build and release binaries 
using JDK11.
----
2020-05-18 17:09:32 UTC - Pierre-Yves Lebecq: Thanks for the reply. I 
understand from pulsar point of view it does, however I assume multiple 
instances of the same function can create some race conditions. For example, if 
I create counters using putState / getState instead of using counters, and two 
instances do the following at the same time (pseudo code):

counter = getState(“my.counter.key”) // we assume current state is 42
putState(“my.counter.key”, counter + 1) // will result in 43 instead of 44

Am I correct with this?
----
2020-05-18 17:18:27 UTC - JG: @Sijie Guo do you think its possible to make a 
JDK11 release for Pulsar ? Because JDK11 is gaining in popularity and the is 
the GraalVM to improve performences as well...
----
2020-05-18 17:19:56 UTC - Sijie Guo: Yes. Will include JDK11 release for the 
upcoming 2.6.0 release.
+1 : David Kjerrumgaard
----
2020-05-18 18:34:09 UTC - Rounak Jaggi: How can we enable tls certs in 
zookeeper?
----
2020-05-18 18:35:44 UTC - Matteo Merli: For LZ4 there are problems in including 
and shading it (eg: the relocated C libraries are not found anymore).

The protobuf stuff should be included.
----
2020-05-18 19:37:13 UTC - Alan Broddle: Zookeeper TLS Question:  So far, we 
have used the java command line inputs to pass in the TLS related information 
to zookeeper.  We are concerned about the current password related parameter.
&gt; -Dzookeeper.ssl.trustStore.*password*=&lt;_*passwordhere*_&gt;“.  
Given that it is a bad practice to show visible passwords in running processes 
on a server, what is the variable to include in the password file location to 
the security variables?
Is there something like: ???
&gt; -Dzookeeper.ssl.trustStore.password_*&lt;FilePath&gt;*_=&lt;Path&gt;
----
2020-05-18 20:46:30 UTC - Sijie Guo: 
<https://cwiki.apache.org/confluence/display/ZOOKEEPER/ZooKeeper+SSL+User+Guide>
----
2020-05-18 20:47:04 UTC - Sijie Guo: @Enrico Olivelli who is one zookeeper PMC 
member. He can probably help with this question.
----
2020-05-18 20:56:28 UTC - Ryan Van Antwerp: @Ryan Van Antwerp has joined the 
channel
----
2020-05-18 21:33:37 UTC - JG: ok nice !!!
----
2020-05-18 23:36:26 UTC - VanderChen: I'm going to specify a key for the 
consumer. For example, there are messages with key-1 and key-2. I want to 
construct a consumer only receive message with key-1. But I don't know how to 
achieve this.
I use this to find the relationship between String key and its hashcode. And 
specify the key's hashcode when construct the consumer. But it looks like wrong 
way now.
```int hashcode = Murmur3_32Hash.getInstance().makeHash("key") % 65536;```

----
2020-05-19 00:44:27 UTC - Rounak Jaggi: This is using keystore and truststore. 
Is there a way to use PEM for ZK TLS?
----
2020-05-19 01:11:56 UTC - Tymm: Hi, I am using pulsar 2.4.2/ 2.5.1 standalone 
and having problem where pulsar functions that uses state will get stuck/ stop 
working after 10k+ successful operations (sometimes more at &lt; 100k).... I 
have to restart pulsar standalone/ delete and add the functions again to make 
it run for the next 10k+ times.... any thoughts?
----
2020-05-19 01:42:30 UTC - Penghui Li: Can you try to use 
`Murmur3_32Hash.getInstance().makeHash("key-1") % 65536` for consumer-1 and use 
`Murmur3_32Hash.getInstance().makeHash("key-2") % 65536` for consumer-2?
----
2020-05-19 01:48:19 UTC - Penghui Li: Can this be reproduced steadily? Could 
you please help create an issue on Github and it’s better provide steps to 
reproduce, thanks.
----
2020-05-19 02:35:27 UTC - Alexandre DUVAL: What is the best ways to get metrics 
per namespace?
----
2020-05-19 02:47:38 UTC - Luke Stephenson: @Luke Stephenson has joined the 
channel
----
2020-05-19 02:55:04 UTC - Luke Stephenson: Hello.  I've recently spun up a 
pulsar cluster on k8s using the helm templates.  After creating a producer 
which published messages to a persistent topic, the brokers have stopped 
starting with the following error:
`ERROR org.apache.pulsar.PulsarBrokerStarter - -- Shutting down - Received OOM 
exception: failed to allocate 16777216 byte(s) of direct memory (used: 
268435456, max: 268435456)`
I doubled the memory allocated to the brokers but now it just fails with a 
similar message reflecting more memory used:
`ERROR org.apache.pulsar.PulsarBrokerStarter - -- Shutting down - Received OOM 
exception: failed to allocate 16777216 byte(s) of direct memory (used: 
536870912, max: 536870912)`
I've seen a bug report which may fix this issue 
<https://github.com/apache/pulsar/pull/6634>, but that is not available yet.
I'm assuming the brokers should work with the default amount of memory 
suggested in the helm templates (perhaps not optimal caching, but should be 
stable).
Any suggestions?
Thanks.
----
2020-05-19 02:56:54 UTC - Sam Xie: @Sam Xie has joined the channel
----
2020-05-19 03:54:12 UTC - VanderChen: I have tried this, but is not work. 
Consumer 1 will receive all message
----
2020-05-19 04:22:39 UTC - ckdarby: Isn't this exposed in Prometheus? I see 
metrics in Grafana for cluster, namespace and topics.
----
2020-05-19 04:24:06 UTC - Ruian: I would recommend you try replacing all values 
of  `BOOKIE_MEM` and `PULSAR_MEM`  env to  `" -XX:+UseContainerSupport 
-XX:InitialRAMPercentage=40.0 -XX:MinRAMPercentage=20.0 
-XX:MaxRAMPercentage=80.0 "` and setting `resource memory limits` to 500M in 
the k8s container spec.
----
2020-05-19 05:39:23 UTC - Ken Huang: Hi, I use configurationStore to deploy two 
clusters. I found if I enable functionsWoker then only first broker of the 
cluster can work and the second broker of the cluster will get error


----
2020-05-19 05:39:23 UTC - Ken Huang: ```02:15:38.662 
[ForkJoinPool.commonPool-worker-0] WARN  
org.apache.pulsar.broker.web.PulsarWebResource - Namespace missing local 
cluster name in clusters list: local_cluster=pulsar-14 ns=public/functions 
clusters=[pulsar-145]
02:15:38.675 [pulsar-web-41-1] INFO  org.eclipse.jetty.server.RequestLog - 
10.244.102.110 - - [18/May/2020:02:15:38 +0000] "PUT 
/admin/v2/persistent/public/functions/assignments HTTP/1.1" 412 60 "-" 
"Pulsar-Java-v2.5.1" 118
02:15:38.689 [AsyncHttpClient-52-1] WARN  
org.apache.pulsar.client.admin.internal.BaseResource - 
[<http://pulsar-14-broker-0.pulsar-14-broker.pulsar.svc.cluster.local:8080/admin/v2/persistent/public/functions/assignments>]
 Failed to perform http put request: 
<http://javax.ws.rs|javax.ws.rs>.ClientErrorException: HTTP 412 Precondition 
Failed
02:15:38.698 [main] ERROR org.apache.pulsar.functions.worker.WorkerService - 
Error Starting up in worker
org.apache.pulsar.client.admin.PulsarAdminException$PreconditionFailedException:
 Namespace does not have any clusters configured
        at 
org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:220)
 ~[org.apache.pulsar-pulsar-client-admin-original-2.5.1.jar:2.5.1]
        at 
org.apache.pulsar.client.admin.internal.BaseResource$1.failed(BaseResource.java:130)
 ~[org.apache.pulsar-pulsar-client-admin-original-2.5.1.jar:2.5.1]
        at 
org.glassfish.jersey.client.JerseyInvocation$4.failed(JerseyInvocation.java:1030)
 ~[org.glassfish.jersey.core-jersey-client-2.27.jar:?]
        at 
org.glassfish.jersey.client.JerseyInvocation$4.completed(JerseyInvocation.java:1017)
 ~
        ... 47 more```
----
2020-05-19 05:39:23 UTC - Ken Huang: 
p.s pulsar-14 and pulsar-145 is my cluster name
----
2020-05-19 06:31:51 UTC - Patrik Kleindl: Hi, some general questions, as both 
Pulsar and Bookkeeper require Zookeeper, is it a common practice to use the 
same ZK cluster for both or better to keep it separated?
I was just wondering how many instances of ZK would be needed, as other systems 
usually require 3-5 ZK instances.
How resource-intensive is ZK regarding Pulsar and Bookkeeper?
----
2020-05-19 06:31:54 UTC - Andreas Müller: Filed a bug: 
<https://github.com/apache/pulsar/issues/6982>
----
2020-05-19 06:32:35 UTC - Rattanjot Singh: What configurations are available to 
keep the connection alive when a connection is idle for long time? Connection 
gets closed at 60 seconds and new connection is used by producer automatically. 
Setting up 
<http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientBuilder.html#keepAliveInterval-int-java.util.concurrent.TimeUnit-|keepAliveInterval>
 on client builder is not helping in keeping the connection alive.
----
2020-05-19 06:50:01 UTC - Luke Stephenson: We will give this a go @Ruian
----
2020-05-19 06:50:52 UTC - Luke Stephenson: But regardless of the memory 
allocated, it shouldn't OOM right, just work less efficiently.  Do you have any 
knowledge around this?
----
2020-05-19 06:52:34 UTC - Luke Stephenson: The pulsar docs mention the 
following about compacted topics 
(<https://pulsar.apache.org/docs/en/cookbooks-compaction/#when-should-i-use-compacted-topics>):
• They can read from the "original," non-compacted topic in case they need 
access to "historical" values, i.e. the entirety of the topic's messages.
• They can read from the compacted topic if they only want to see the most 
up-to-date messages.
What if I'm not interested in the entirety of the original topics values and 
just want the compacted version? Is it possible to have a retention policy 
which only clears up the "original" topic to prevent it from growing.  We will 
only have consumers of the compacted topic, so it would be very wasteful to 
continue to store the complete history of messages on the original topic.

Also, when we publish messages that can be compacted, they typically need to 
have the full copy of the latest state (as we are saying it's valid to skip 
over intermediate states).  So the intention of the producers is to publish 
messages which will be compacted, not consumed individually from the beginning 
of time.
----
2020-05-19 07:42:39 UTC - Sijie Guo: @VanderChen at the producer side, you need 
to use keyBasedBatcher.
----
2020-05-19 07:42:53 UTC - Sijie Guo: otherwise, the messages are batched into 
one large batch.
----
2020-05-19 07:43:02 UTC - Sijie Guo: so the consumer 1 will receive all the 
messages.
----
2020-05-19 07:45:10 UTC - Sijie Guo: @Luke Stephenson Currently if you are 
producing much faster than the bookies can support, you might be experiencing 
OOM. In this case, you can set maxPendingRequests at the bookie side or enable 
rate throttling at the broker side.
----
2020-05-19 07:47:10 UTC - Sijie Guo: You need to manually set the replication 
clusters for the functions namespace. Otherwise one cluster is not able to 
startup. There is a Github issue for tracking an improvement for this.
----
2020-05-19 07:47:58 UTC - Sijie Guo: You can use the same zookeeper instance.

3~5 nodes is a very typical setup.
----
2020-05-19 07:49:44 UTC - Sijie Guo: In pulsar’s wire protocol, there is also 
ping/pong message to keep alive the connection. You connection shouldn’t be 
close every 60 seconds. Do you have more information about this issue?
----
2020-05-19 07:51:45 UTC - Sijie Guo: &gt; Is it possible to have a retention 
policy which only clears up the “original” topic to prevent it from growing
The original data will be deleted/reclaimed based on its retention policy.
----
2020-05-19 07:59:43 UTC - Deepa: @Deepa has joined the channel
----
2020-05-19 08:07:41 UTC - Deepa: I used below code in deug mode, and waited at `
```System.out.println("wait!!!!");```
for more than 60 seconds and proceeded.
----
2020-05-19 08:07:58 UTC - Deepa: ```public static void main (String[] args) 
throws PulsarClientException, InterruptedException {
        PulsarClient pclient = 
PulsarClient.builder().keepAliveInterval(3000,TimeUnit.SECONDS)
                .serviceUrl("<pulsar://localhost:6650>").build();

        Producer&lt;byte[]&gt; producer = pclient.newProducer()
                .topic("my-keepalive-topic")
                .create();

        for(int i=0;i&lt;2;i++){
            String msg = "asdfasdfad";
            producer.newMessage().value(msg.getBytes()).send();
            System.out.println("Produced Message: "+msg);
        }

        ProducerImpl&lt;byte[]&gt; test= (ProducerImpl&lt;byte[]&gt;) producer;
        System.out.println(test.getConnectionId());

        System.out.println("wait!!!!");


        for(int i=0;i&lt;2;i++){
            String msg = "qerqewrq";
            producer.newMessage().value(msg.getBytes()).send();
            System.out.println("Produced Message: "+msg);
        }

        ProducerImpl&lt;byte[]&gt; test2= (ProducerImpl&lt;byte[]&gt;) producer;
        System.out.println(test2.getConnectionId());

        producer.close();
        pclient.close();
    }```

----
2020-05-19 08:09:02 UTC - Deepa: Output:
```Produced Message: asdfasdfad
Produced Message: asdfasdfad
[id: 0x2f333067, L:/127.0.0.1:50045 - R:localhost/127.0.0.1:6650]
wait!!!!
Produced Message: qerqewrq
Produced Message: qerqewrq
[id: 0x5588b1e0, L:/127.0.0.1:50053 - R:localhost/127.0.0.1:6650]```
ConnectionId "127.0.0.1:50045 " is closed after 60 seconds and second iteration 
of produce messages used a different connenction id "127.0.0.1:50053"
----
2020-05-19 08:11:53 UTC - Deepa: Few log lines from pulsar standalone console 
which correlate with above output
```13:35:04.903 [pulsar-io-50-12] INFO  
org.apache.pulsar.broker.service.ServerCnx - New connection from 
/127.0.0.1:50045

13:35:05.272 [ForkJoinPool.commonPool-worker-6] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50045] Created new 
producer: 
Producer{topic=PersistentTopic{topic=<persistent://public/default/my-keepalive-topic>},
 client=/127.0.0.1:50045, producerName=standalone-5-3, producerId=0}

13:36:04.904 [pulsar-io-50-12] WARN  
org.apache.pulsar.common.protocol.PulsarHandler - [[id: 0xf0a6deef, 
L:/127.0.0.1:6650 - R:/127.0.0.1:50045]] Forcing connection to close after 
keep-alive timeout

13:36:04.904 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx 
- Closed connection from /127.0.0.1:50045

13:36:16.038 [pulsar-io-50-15] INFO  org.apache.pulsar.broker.service.ServerCnx 
- New connection from /127.0.0.1:50053

13:36:16.043 [ForkJoinPool.commonPool-worker-2] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50053] Created new 
producer: 
Producer{topic=PersistentTopic{topic=<persistent://public/default/my-keepalive-topic>},
 client=/127.0.0.1:50053, producerName=standalone-5-3, producerId=0}```

----
2020-05-19 08:21:54 UTC - Deepa: Is is because of the debug mode, the ping/pong 
message to keep alive is also paused and the connections are getting closed?
----
2020-05-19 08:27:50 UTC - Deepa: Is there an explicit option to set to keep an 
connection alive for mentioned time?
----
2020-05-19 08:45:44 UTC - Olivier Chicha: Hi, I am facing some issues to run my 
integration tests inside eclipse
those tests work fine using maven but I can't get them to run as junit inside 
eclipse:
I am getting this exception:
&gt; Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodError: 
'org.apache.pulsar.common.schema.SchemaInfo 
org.apache.pulsar.common.schema.SchemaInfo.setName(java.lang.String)'
&gt;    at 
org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:46)
&gt;    at 
org.apache.pulsar.client.internal.DefaultImplementation.newBytesSchema(DefaultImplementation.java:136)
&gt;    at org.apache.pulsar.client.api.Schema.&lt;clinit&gt;(Schema.java:149)
&gt;    ... 38 more
&gt; Caused by: java.lang.NoSuchMethodError: 
'org.apache.pulsar.common.schema.SchemaInfo 
org.apache.pulsar.common.schema.SchemaInfo.setName(java.lang.String)'
&gt;    at 
org.apache.pulsar.client.impl.schema.BytesSchema.&lt;clinit&gt;(BytesSchema.java:35)
&gt;    at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
&gt;    at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
&gt;    at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
&gt;    at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
&gt;    at java.base/java.lang.Class.newInstance(Class.java:584)
&gt;    at 
org.apache.pulsar.client.internal.DefaultImplementation.lambda$10(DefaultImplementation.java:138)
&gt;    at 
org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:35)
&gt;    ... 40 more
looks like it is related to the fact setname in SchemaInfo is declared via a 
lombok annotation on the name attribute.
I have installed the longbok pluggin on my eclipse but it still does not work.
any idea on how to solve this?
----

Reply via email to