2020-07-31 11:52:13 UTC - Adriaan de Haan: no, I don't have ackTimeout enabled
- I'd rather not get unacked messages redelivered. But all messages are
immediately ack'ed after consumption
----
2020-07-31 11:52:17 UTC - Adriaan de Haan: ```normalConsumer =
client.newConsumer(JSONSchema.of(SendSmsRequest.class))
.topic(Configurations.getPulsarIncomingTopic())
.receiverQueueSize(20000)
.batchReceivePolicy(BatchReceivePolicy.builder()
.maxNumBytes(1024 * 1024 * 10)
.maxNumMessages(100)
.timeout(100, TimeUnit.MILLISECONDS).build())
.subscriptionName(NORMAL_SUBSCRIPTION_NAME)
.subscriptionType(SubscriptionType.Shared)
.subscribe();```
----
2020-07-31 11:54:04 UTC - Adriaan de Haan: Note that this is happening at a
very low TPS and I can see that each of the messages are actually successfully
processed.
----
2020-07-31 11:54:38 UTC - Adriaan de Haan: Could this be caused by a version
conflict between producer and consumer pulsar client versions?
----
2020-07-31 11:55:27 UTC - Adriaan de Haan: I have recently upgraded from 2.5.2
to 2.6.0 - pretty sure everything is on 2.6.0 now but wondering if a version
conflict could cause something like this?
----
2020-07-31 12:06:57 UTC - Adriaan de Haan: This is my producer:
```producer =
createPulsarClient().newProducer(JSONSchema.of(SendSmsRequest.class))
.topic(pulsarSendTopic)
.maxPendingMessages(20000).batchingMaxMessages(10000)
.batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS)
.blockIfQueueFull(true)
.compressionType(CompressionType.LZ4) // Enable compression
.create();```
----
2020-07-31 12:08:13 UTC - Adriaan de Haan: and this is my consumption (in a
loop):
```Message<SendSmsRequest> message = normalConsumer.receive(60,
TimeUnit.SECONDS);
// For now ack all messages
if (message == null)
{
ServiceLogger.log(<http://Priority.INFO|Priority.INFO>, "No
new messages... waiting");
continue;
}
normalConsumer.acknowledge(message);```
----
2020-07-31 12:27:40 UTC - Adriaan de Haan: I create 24 consumers on different
threads, on each of the threads I also create a new client. What is the
recommended way to do it - create a singleton client and re-use it from
different threads? Or doesn't it make much difference for a small enough amount
of threads?
----
2020-07-31 13:44:29 UTC - Kevin Barresi: Hello,
I am using the Streamnative Pulsar Flink connector to create a Pulsar source.
I'm a bit confused about how subscriptions are working. Ideally, I would like
to have a single subscription (we'll call it `my-sub` ), which is persistent
and can handle Flink job restarts without losing unacked messages.
Currently, I connect by calling `setStartFromSubscription("my-sub")` on my
`FlinkPulsarSource` object. However, what I see in the manager UI is that two
subscriptions are created when the job first starts: `my-sub` (with a blank
type) and `my-sub-reader-<uuid>` (with an "exclusive") type. Now, the
`my-sub` subscription looks like it's doing what I want, in the sense that it
stays open even after the job is ended. However, when I restart the job and
keep adding messages:
• The `my-sub` backlog grows without end
• Upon starting the job again, a new `my-sub-reader-<uuid>` is created
with the full backlog of `my-sub`
I would expect that as messages are consumed, the backlog of both subscriptions
is decreased, and upon restarting the job, only messages that were unprocessed
would be in the backlog. Any idea what I'm doing wrong here?
----
2020-07-31 14:45:36 UTC - Addison Higham: @Kevin Barresi I think there is a bit
of confusion on how that connector works. First, a bit of background, the
streamnative pulsar connector uses the `Reader` API, the reason for this is
that the reader is a better model for allowing flink to get exactly-once inputs
from Pulsar, as we let Flink control the offsets so in the event of failure, it
will reset the reader to the exact place it wants to resume, but that means we
don't really want to use pulsar subscription to track the state, we instead
need to use flink state. However, when you checkpoint, it does update that
subscription
~In other words, flink won't use `my-sub` at all. The purpose of
`setStartFromSubscription` is a bit nuanced. Specifically, if you do have a job
you want to start at an exact offset (say if you want to move a job to a new
flink cluster) then you should be able to start a job from an exact set of
offsets. With partitioned topics, the offsets can be a bit cumbersome to
communicate. The purpose of that `setStartFromSubscription` is you can create a
subscription that is exactly where you want it and use that subscription as the
mechanism to communicate the offsets to your flink job.One detail though, it is
isn't really expected for you to always have that option set, as it overrides
your job from using your stored offsets and instead always falls back to the
subscription, which is why you see the behavior.~
EDIT: see follow on comment, I was wrong here
----
2020-07-31 15:00:55 UTC - Kevin Barresi: Ok I see - in that case, it sounds
like I don't need to be using `setStartFromSubscription` at all. However, when
I don't use that, each time my job starts, it creates a pair of new
subscriptions: `flink-pulsar-<uuid>` and
`flink-pulsar-<uuid>-reader-<uuid>`. When the job stops, the
`flink-pulsar-<uuid>-reader-<uuid>` subscription is removed but
the `flink-pulsar-<uuid>` subscription remains. And then when the job is
started again, a new pair of subscriptions is used (with no backlog). Wouldn't
I want to connect to the same subscription if the job is stopped/started to
ensure the backlog contains messages published while the job wasn't running?
----
2020-07-31 15:07:08 UTC - Addison Higham: I may have a bit of a
misunderstanding myself and said something not completely true, this code has
changed a fair bit since I last used it.
One question for you though, do you have checkpointing enabled in your job?
----
2020-07-31 15:26:21 UTC - Addison Higham: @Kevin Barresi sorry for some
misinformation, what I said about using flink state was correct, but was wrong
about the details of external subscription, in addition to writing flink state,
the connector *also* updates the subscriptions when you checkpoint, so if you
do use an external subscription, it should be safe to continue using it and
have it be updated.
Likewise, if you don't use an external subscription, the new subscription it
creates is updated.
Once again, it actually restores from flink state, but the subscriptions are
updated to give you some idea of where the job is and also makes restarting a
job without flink state (like when migrating a cluster) much easier
But all of this is dependent on you having checkpointing enabled and running
regularly, if you don't have checkpointing, that is the behavior I would suspect
----
2020-07-31 15:42:28 UTC - Kalyn Coose: Hey @Chris Hansen, I'm getting a 500
Internal Server Error when trying to create a Java Pulsar Function, do you have
any advice? Output:
```[pulsar]$ bin/pulsar-admin functions create --name GPRouter -i
all-global-partners --jar /tmp/pulsar-functions.jar --classname GPRouter
HTTP 500 Internal Server Error
Reason: HTTP 500 Internal Server Error```
I don't think it's a permissions issue with my jar file, I'm going to check
broker logs now...
----
2020-07-31 15:48:50 UTC - Sijie Guo: You can use a single client. Pulsar uses
thread pools and connection pool. You don’t need to create multiple clients.
----
2020-07-31 15:49:36 UTC - Sijie Guo: Currently the ack in Pulsar is best
effort. It doesn’t guarantee ack to reach brokers even you call
consumer#acknowledge.
----
2020-07-31 15:49:57 UTC - Kalyn Coose: I found my issue:
```java.lang.UnsupportedClassVersionError: GPRouter has been compiled by a more
recent version of the Java Runtime (class file version 58.0), this version of
the Java Runtime only recognizes class file versions up to 52.0```
Welp
----
2020-07-31 15:50:14 UTC - Sijie Guo: Hence if there are acks missed, it might
result in the holes in individualDeleteMessages which can cause the storage
size grow.
----
2020-07-31 15:51:00 UTC - Sijie Guo: Hence I would recommend to enable
ackTimeout
----
2020-07-31 16:31:20 UTC - Kevin Barresi: @Addison Higham - it turns out I did
not have checkpointing enabled. When I did enable checkpointing and used
`setStartFromSubscription`, it performed as expected! The `my-sub` subscription
persisted job restarts, and the `my-sub-reader-<uuid>` subscription that
is created for each job correctly pulls messages off the main `my-sub`
subscription. Thank you for your help!
----
2020-07-31 16:32:18 UTC - Addison Higham: :bow: if there is some doc
improvements that could help, contributions are always welcome
:slightly_smiling_face:
+1 : Kevin Barresi
----
2020-07-31 16:37:07 UTC - puneetkhanal: Hi,
I am using pulsar delay feature and I am seeing delayed message being stuck,
not delivered at all when using pulsar in docker stack. There is no error
message or anything, just that the message is not delivered at all. This
happens only some times, most of the time it works. What might be the issue?
----
2020-07-31 16:50:30 UTC - Chris Hansen: Anyone know why `topics stats-internal`
would show 3 messages being consumed by a function while `functions stats`
shows 0? All indications are my function is being created but not being called.
----
2020-07-31 17:19:44 UTC - Peter Newell: @Peter Newell has joined the channel
----
2020-07-31 17:49:15 UTC - Walter: @Addison Higham there is mismatch count of
ledgers between listunderreplicated and decommission rereplicated count
----
2020-07-31 17:52:55 UTC - Feanil: @Feanil has joined the channel
----
2020-07-31 20:24:18 UTC - krishna reddy jaggannolla: Hello, How can we check if
the susbscriber is not active for 7 days?
----
2020-07-31 20:24:29 UTC - krishna reddy jaggannolla: or doesnt matter with days
----
2020-07-31 21:14:34 UTC - Sijie Guo: Did you see the result produced to the
output topic?
----
2020-07-31 21:15:02 UTC - Sijie Guo: We need to identify if this is a function
stats issue or not.
----
2020-07-31 21:33:46 UTC - Chris Hansen: no result produced to the output topic
----
2020-07-31 21:34:44 UTC - Chris Hansen: I can create a minimal project to
reproduce if that helps
----
2020-07-31 22:14:47 UTC - Sijie Guo: Yeah. That would be better.
+1 : Chris Hansen
----
2020-07-31 23:48:46 UTC - Chris Hansen: Here it is:
<https://github.com/hansenc/pulsar-scratch>
I included some instructions in the README but let me know if you have any
questions
----