2020-07-31 11:52:13 UTC - Adriaan de Haan: no, I don't have ackTimeout enabled 
- I'd rather not get unacked messages redelivered. But all messages are 
immediately ack'ed after consumption
----
2020-07-31 11:52:17 UTC - Adriaan de Haan: ```normalConsumer = 
client.newConsumer(JSONSchema.of(SendSmsRequest.class))
                .topic(Configurations.getPulsarIncomingTopic())
                .receiverQueueSize(20000)
                .batchReceivePolicy(BatchReceivePolicy.builder()
                        .maxNumBytes(1024 * 1024 * 10)
                        .maxNumMessages(100)
                        .timeout(100, TimeUnit.MILLISECONDS).build())           
     
                .subscriptionName(NORMAL_SUBSCRIPTION_NAME)
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();```
----
2020-07-31 11:54:04 UTC - Adriaan de Haan: Note that this is happening at a 
very low TPS and I can see that each of the messages are actually successfully 
processed.
----
2020-07-31 11:54:38 UTC - Adriaan de Haan: Could this be caused by a version 
conflict between producer and consumer pulsar client versions?
----
2020-07-31 11:55:27 UTC - Adriaan de Haan: I have recently upgraded from 2.5.2 
to 2.6.0 - pretty sure everything is on 2.6.0 now but wondering if a version 
conflict could cause something like this?
----
2020-07-31 12:06:57 UTC - Adriaan de Haan: This is my producer:
```producer = 
createPulsarClient().newProducer(JSONSchema.of(SendSmsRequest.class))
                    .topic(pulsarSendTopic)
                    .maxPendingMessages(20000).batchingMaxMessages(10000)
                    .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS)
                    .blockIfQueueFull(true)
                    .compressionType(CompressionType.LZ4) // Enable compression
                    .create();```
----
2020-07-31 12:08:13 UTC - Adriaan de Haan: and this is my consumption (in a 
loop):
```Message<SendSmsRequest> message = normalConsumer.receive(60, 
TimeUnit.SECONDS);
                // For now ack all messages
                if (message == null)
                {
                    ServiceLogger.log(<http://Priority.INFO|Priority.INFO>, "No 
new messages... waiting");
                    continue;
                }
                normalConsumer.acknowledge(message);```
----
2020-07-31 12:27:40 UTC - Adriaan de Haan: I create 24 consumers on different 
threads, on each of the threads I also create a new client. What is the 
recommended way to do it - create a singleton client and re-use it from 
different threads? Or doesn't it make much difference for a small enough amount 
of threads?
----
2020-07-31 13:44:29 UTC - Kevin Barresi: Hello,

I am using the Streamnative Pulsar Flink connector to create a Pulsar source. 
I'm a bit confused about how subscriptions are working. Ideally, I would like 
to have a single subscription (we'll call it `my-sub` ), which is persistent 
and can handle Flink job restarts without losing unacked messages.

Currently, I connect by calling `setStartFromSubscription("my-sub")` on my 
`FlinkPulsarSource` object. However, what I see in the manager UI is that two 
subscriptions are created when the job first starts: `my-sub` (with a blank 
type) and `my-sub-reader-&lt;uuid&gt;` (with an "exclusive") type. Now, the 
`my-sub` subscription looks like it's doing what I want, in the sense that it 
stays open even after the job is ended. However, when I restart the job and 
keep adding messages:
• The `my-sub` backlog grows without end
• Upon starting the job again, a new `my-sub-reader-&lt;uuid&gt;` is created 
with the full backlog of `my-sub`
I would expect that as messages are consumed, the backlog of both subscriptions 
is decreased, and upon restarting the job, only messages that were unprocessed 
would be in the backlog. Any idea what I'm doing wrong here?
----
2020-07-31 14:45:36 UTC - Addison Higham: @Kevin Barresi I think there is a bit 
of  confusion on how that connector works. First, a bit of background, the 
streamnative pulsar connector uses the `Reader` API, the reason for this is 
that the reader is a better model for allowing flink to get exactly-once inputs 
from Pulsar, as we let Flink control the offsets so in the event of failure, it 
will reset the reader to the exact place it wants to resume, but that means we 
don't really want to use pulsar subscription to track the state, we instead 
need to use flink state. However, when you checkpoint, it does update that 
subscription

~In other words, flink won't use `my-sub` at all. The purpose of 
`setStartFromSubscription` is a bit nuanced. Specifically, if you do have a job 
you want to start at an exact offset (say if you want to move a job to a new 
flink cluster) then you should be able to start a job from an exact set of 
offsets. With partitioned topics, the offsets can be a bit cumbersome to 
communicate. The purpose of that `setStartFromSubscription` is you can create a 
subscription that is exactly where you want it and use that subscription as the 
mechanism to communicate the offsets to your flink job.One detail though, it is 
isn't really expected for you to always have that option set, as it overrides 
your job from using your stored offsets and instead always falls back to the 
subscription, which is why you see the behavior.~

EDIT: see follow on comment, I was wrong here
----
2020-07-31 15:00:55 UTC - Kevin Barresi: Ok I see - in that case, it sounds 
like I don't need to be using `setStartFromSubscription` at all. However, when 
I don't use that, each time my job starts, it creates a pair of new 
subscriptions: `flink-pulsar-&lt;uuid&gt;` and 
`flink-pulsar-&lt;uuid&gt;-reader-&lt;uuid&gt;`. When the job stops, the 
`flink-pulsar-&lt;uuid&gt;-reader-&lt;uuid&gt;`  subscription is removed but 
the `flink-pulsar-&lt;uuid&gt;` subscription remains. And then when the job is 
started again, a new pair of subscriptions is used (with no backlog). Wouldn't 
I want to connect to the same subscription if the job is stopped/started to 
ensure the backlog contains messages published while the job wasn't running?
----
2020-07-31 15:07:08 UTC - Addison Higham: I may have a bit of a 
misunderstanding myself and said something not completely true, this code has 
changed a fair bit since I last used it.

One question for you though, do you have checkpointing enabled in your job?
----
2020-07-31 15:26:21 UTC - Addison Higham: @Kevin Barresi sorry for some 
misinformation, what I said about using flink state was correct, but was wrong 
about the details of external subscription, in addition to writing flink state, 
the connector *also* updates the subscriptions when you checkpoint, so if you 
do use an external subscription, it should be safe to continue using it and 
have it be updated.

Likewise, if you don't use an external subscription, the new subscription it 
creates is updated.

Once again, it actually restores from flink state,  but the subscriptions are 
updated to give you some idea of where the job is and also makes restarting a 
job without flink state (like when migrating a cluster) much easier

But all of this is dependent on you having checkpointing enabled and running 
regularly, if you don't have checkpointing, that is the behavior I would suspect
----
2020-07-31 15:42:28 UTC - Kalyn Coose: Hey @Chris Hansen, I'm getting a 500 
Internal Server Error when trying to create a Java Pulsar Function, do you have 
any advice? Output:
```[pulsar]$ bin/pulsar-admin functions create --name GPRouter -i 
all-global-partners --jar /tmp/pulsar-functions.jar --classname GPRouter
HTTP 500 Internal Server Error

Reason: HTTP 500 Internal Server Error```
I don't think it's a permissions issue with my jar file, I'm going to check 
broker logs now...
----
2020-07-31 15:48:50 UTC - Sijie Guo: You can use a single client. Pulsar uses 
thread pools and connection pool. You don’t need to create multiple clients.
----
2020-07-31 15:49:36 UTC - Sijie Guo: Currently the ack in Pulsar is best 
effort. It doesn’t guarantee ack to reach brokers even you call 
consumer#acknowledge.
----
2020-07-31 15:49:57 UTC - Kalyn Coose: I found my issue:
```java.lang.UnsupportedClassVersionError: GPRouter has been compiled by a more 
recent version of the Java Runtime (class file version 58.0), this version of 
the Java Runtime only recognizes class file versions up to 52.0```
Welp
----
2020-07-31 15:50:14 UTC - Sijie Guo: Hence if there are acks missed, it might 
result in the holes in individualDeleteMessages which can cause the storage 
size grow.
----
2020-07-31 15:51:00 UTC - Sijie Guo: Hence I would recommend to enable 
ackTimeout
----
2020-07-31 16:31:20 UTC - Kevin Barresi: @Addison Higham - it turns out I did 
not have checkpointing enabled. When I did enable checkpointing and used 
`setStartFromSubscription`, it performed as expected! The `my-sub` subscription 
persisted job restarts, and the `my-sub-reader-&lt;uuid&gt;` subscription that 
is created for each job correctly pulls messages off the main `my-sub` 
subscription. Thank you for your help!
----
2020-07-31 16:32:18 UTC - Addison Higham: :bow: if there is some doc 
improvements that could help, contributions are always welcome 
:slightly_smiling_face:
+1 : Kevin Barresi
----
2020-07-31 16:37:07 UTC - puneetkhanal: Hi,
I am using pulsar delay feature and I am seeing delayed message being stuck, 
not delivered at all when using pulsar in docker stack. There is no error 
message or anything, just that the message is not delivered at all. This 
happens only some times, most of the time it works. What might be the issue?
----
2020-07-31 16:50:30 UTC - Chris Hansen: Anyone know why `topics stats-internal` 
would show 3 messages being consumed by a function while `functions stats`  
shows 0? All indications are my function is being created but not being called.
----
2020-07-31 17:19:44 UTC - Peter Newell: @Peter Newell has joined the channel
----
2020-07-31 17:49:15 UTC - Walter: @Addison Higham there is mismatch count of 
ledgers between listunderreplicated and decommission rereplicated count
----
2020-07-31 17:52:55 UTC - Feanil: @Feanil has joined the channel
----
2020-07-31 20:24:18 UTC - krishna reddy jaggannolla: Hello, How can we check if 
the susbscriber is not active for 7 days?
----
2020-07-31 20:24:29 UTC - krishna reddy jaggannolla: or doesnt matter with days
----
2020-07-31 21:14:34 UTC - Sijie Guo: Did you see the result produced to the 
output topic?
----
2020-07-31 21:15:02 UTC - Sijie Guo: We need to identify if this is a function 
stats issue or not.
----
2020-07-31 21:33:46 UTC - Chris Hansen: no result produced to the output topic
----
2020-07-31 21:34:44 UTC - Chris Hansen: I can create a minimal project to 
reproduce if that helps
----
2020-07-31 22:14:47 UTC - Sijie Guo: Yeah. That would be better.
+1 : Chris Hansen
----
2020-07-31 23:48:46 UTC - Chris Hansen: Here it is: 
<https://github.com/hansenc/pulsar-scratch>
I included some instructions in the README but let me know if you have any 
questions
----

Reply via email to