2019-05-30 10:18:20 UTC - Chris Camplejohn: Hi All, has anyone got examples of 
the best way to dequeue things from Pulsar in order to achieve maximum 
throughput? We are currently calling receive, but we then have to write to 
Kafka which is quite time consuming. Ordering is important to us, so I am not 
sure we can use the async API as I believe that would potentially jumble the 
ordering?
----
2019-05-30 15:18:31 UTC - Addison Higham: have you considered using the kafka 
sink? <https://pulsar.apache.org/docs/en/io-kafka/#sink>
----
2019-05-30 15:18:58 UTC - Addison Higham: if you are doing some munging, you 
could use a pulsar function beforehand
----
2019-05-30 15:22:10 UTC - Addison Higham: but I would *think* that an async 
receive should still resolve in order, but you would probably want to validate 
that
----
2019-05-30 15:22:38 UTC - Daniel Sef: @Daniel Sef has joined the channel
----
2019-05-30 18:56:40 UTC - Ambud Sharma: @Matteo Merli how would ordering be 
honored on the consumer side for a partitioned topic?
Shared subscription seems to be the only way to run multiple consumer instances 
to subscribe to a topic however as per the documentation messages are delivered 
in a round robin fashion so is ordering no longer preserved?
----
2019-05-30 18:58:32 UTC - Matteo Merli: @Ambud Sharma With failover 
subscription over a partitioned topic, there will be one active consumer 
per-partition, though the partitions will be spread across all active consumer. 
It’s very similar to Kafka consumer group mode.
----
2019-05-30 19:03:40 UTC - Ambud Sharma: thanks @Matteo Merli; is there an 
example that should how to initialize a Pulsar consumer to mimic the Kafka 
consumer behavior? Specifically looking for consumer name pattern and topic 
name pattern that needs to be set (if any).
----
2019-05-30 19:06:01 UTC - Matteo Merli: Setting the 
`subscriptionType(SubscriptionType.Failover)` should be enough.

The consumerName can be used to control the sorting of consumer, though it 
wouldn’t be needed if you just to match N consumers with M partitions.
----
2019-05-30 19:43:49 UTC - Ambud Sharma: makes sense, thank you
----
2019-05-30 22:30:33 UTC - Matt Grosso: I'm curious if there are folks here 
using pulsar in k8s in production today?  or are most people deploying into 
bare VMs?
----
2019-05-30 22:32:55 UTC - Matteo Merli: We do use Pulsar with Kubernetes in 
Prod :slightly_smiling_face:
----
2019-05-30 23:14:20 UTC - Matt Grosso: awesome. we were trying to decide which 
approach was more likely to be the happy path and persistent services in k8s 
still have a lot of FUD
----
2019-05-30 23:28:09 UTC - Addison Higham: huh... so this is weird, spinning up 
a new cluster on k8s using the helm chart and am having issues with my proxy 
instances not being to listen on port 8080, AFAICT, there isn't anything else 
conflicting with that port
----
2019-05-30 23:39:07 UTC - Addison Higham: yeah, so i keep getting this:
```
23:38:22.884 [main] INFO  org.eclipse.jetty.util.thread.ThreadPoolBudget - 
SelectorManager@ServerConnector@35764bef{HTTP/1.1,[http/1.1]}{0.0.0.0:8080} 
requires 1 threads from 
WebExecutorThreadPool[etp244872973]@e98770d{STARTED,4&lt;=4&lt;=4,i=3,q=0,ReservedThreadExecutor@216914{s=0/1,p=0}}
2019-05-30 11:38:22,885 [sun.misc.Launcher$AppClassLoader@4534b60d] error 
Uncaught exception in thread main: Failed to start HTTP server on ports [8080]
```
----
2019-05-30 23:44:25 UTC - Addison Higham: ahhhh okay appears to be this: 
<https://github.com/apache/pulsar/issues/4359>
----
2019-05-30 23:44:27 UTC - Addison Higham: got it
----
2019-05-31 00:12:45 UTC - Devin G. Bost: We had an OOM error on our Redis cache 
that we are mostly recovered from, but our Redis sink is accumulating backlog. 
I checked the sink logs, and I found this:

```
    12:57:14,581 INFO 
[redactedTenant/redactedNamespace/redacted-redis-cache-sink-1] [instance: 1] 
JavaInstanceRunnable - Encountered exception in source read: 
    java.lang.InterruptedException: null
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
 ~[?:1.8.0_181]
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 ~[?:1.8.0_181]
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) 
~[?:1.8.0_181]
        at org.apache.pulsar.io.core.PushSource.read(PushSource.java:49) 
~[org.apache.pulsar-pulsar-io-core-2.3.1.jar:2.3.1]
ERROR: Minions returned with non-zero exit code
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:444)
 [org.apache.pulsar-pulsar-functions-instance-2.3.1.jar:2.3.1]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:239)
 [org.apache.pulsar-pulsar-functions-instance-2.3.1.jar:2.3.1]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
    12:57:14,582 ERROR 
[osp/auctioned-product-impressions/campaign-auction-metadata-cache-sink-1] 
[instance: 1] JavaInstanceRunnable - 
[osp/auctioned-product-impressions/campaign-auction-metadata-cache-sink:1] 
Uncaught exception in Java Instance
    java.lang.RuntimeException: java.lang.InterruptedException
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:448)
 ~[org.apache.pulsar-pulsar-functions-instance-2.3.1.jar:2.3.1]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:239)
 [org.apache.pulsar-pulsar-functions-instance-2.3.1.jar:2.3.1]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
    Caused by: java.lang.InterruptedException
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
 ~[?:1.8.0_181]
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 ~[?:1.8.0_181]
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) 
~[?:1.8.0_181]
        at org.apache.pulsar.io.core.PushSource.read(PushSource.java:49) 
~[org.apache.pulsar-pulsar-io-core-2.3.1.jar:2.3.1]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:444)
 ~[org.apache.pulsar-pulsar-functions-instance-2.3.1.jar:2.3.1]
        ... 2 more
    12:57:14,583 INFO 
[osp/auctioned-product-impressions/campaign-auction-metadata-cache-sink-1] 
[instance: 1] JavaInstanceRunnable - Closing instance
    12:57:14,591 INFO 
[osp/auctioned-product-impressions/campaign-auction-metadata-cache-sink-1] 
[instance: 1] JavaInstanceRunnable - Unloading JAR files for function 
InstanceConfig(instanceId=1, functionId=091975a8-ef20-4b64-9ed8-840e158eed0f, 
functionVersion=76feb47c-8b18-4848-8298-61c334547053, functionDetails . . . ```

Any ideas what `java.lang.InterruptedException: null` could be associated with?
----
2019-05-31 00:24:47 UTC - Devin G. Bost: Also, what's the difference between 
unacked messages and backlog messages?
----
2019-05-31 00:25:45 UTC - Matteo Merli: backlog -&gt; total number of messages 
to be consumed

unacked -&gt; messages pushed from broker to consumer, but not yet acked
----
2019-05-31 00:30:23 UTC - Devin G. Bost: Thanks.
----
2019-05-31 01:13:50 UTC - Devin G. Bost: We have a source that's giving us:

`"org.apache.pulsar.client.api.PulsarClientException$ProducerBlockedQuotaExceededError:
 Cannot create producer on topic with backlog quota exceeded"`

BTW, this is a prod issue.
----
2019-05-31 01:15:56 UTC - Devin G. Bost: What's the quota number?
----
2019-05-31 01:16:06 UTC - Ashok Reddy Reddem: @Ashok Reddy Reddem has joined 
the channel
----
2019-05-31 01:16:10 UTC - Devin G. Bost: We're trying to figure out where the 
backlogged messages are located.
----
2019-05-31 01:25:34 UTC - David Kjerrumgaard: @Devin G. Bost What is the 
configured target topic of the producer that fails to be created? That would 
give an indication as to which namespace is exceeding its backlog quota
----
2019-05-31 01:27:38 UTC - David Kjerrumgaard: @Devin G. Bost Once you know the 
namespace, you can increase the backlog quota as needed with the `pulsar-admin 
namespaces set-backlog-quota tenant/namespace` command
----
2019-05-31 01:29:21 UTC - David Kjerrumgaard: by default the limit is 10GB and 
so you will want to increase the limit to 20+ GB, e.g. `pulsar-admin namespaces 
set-backlog-quota tenant/namespace --limit 20GB`
----
2019-05-31 01:30:09 UTC - David Kjerrumgaard: You can confirm the backlog quota 
using `pulsar-admin namespaces get-backlog-quotas tenant/namespace`
----
2019-05-31 01:47:32 UTC - Devin G. Bost: @David Kjerrumgaard Thanks for the 
feedback. We noticed that some of the functions were giving us 500 errors when 
we tried getting stats on them, and as we stopped and started the functions, we 
noticed data starting to flow again at parts of the pipeline.
----
2019-05-31 01:52:28 UTC - Devin G. Bost: @David Kjerrumgaard What's the default 
retention policy?
----
2019-05-31 01:52:36 UTC - Devin G. Bost: It seems that retention policy is a 
required parameter.
----
2019-05-31 01:55:54 UTC - Devin G. Bost: nvm. I got it from `get-backlog-quotas`
----
2019-05-31 02:11:00 UTC - Devin G. Bost: We can't get everything to flow 
through. Things will flow in pieces, but we can't get the entire flow working 
end-to-end. No clear reason why.
----
2019-05-31 02:21:51 UTC - David Kjerrumgaard: @Devin G. Bost Can you elaborate 
a bit on the flow itself, are all the topics in the same namespace?  What is 
the incoming data volume/rate?  Where are you seeing back-pressure in the flow, 
i.e. which topics have significant message backlog?
----
2019-05-31 02:28:08 UTC - Devin G. Bost: @David Kjerrumgaard
They're all on the same tenant. There are two flows, each in a different 
namespace. Each flow has one source, multiple functions, and one sink.
Incoming data rate on one source is in the thousands of messages per second. 
The backlog that we upped to 30 GB already filled up.

We see a cascade of backlog that decreases in volume as we move farther from 
the source.
----
2019-05-31 02:29:05 UTC - Devin G. Bost: So, the first topic closest to the 
source has the most backlog, then the topic just a step closer to the sink has 
less backlog, then the topic just a step closer to the sink from there has even 
less backlog, etc.
----
2019-05-31 02:30:44 UTC - David Kjerrumgaard: Can you increase the number of 
consumers from the first topic? It appears that the consumers cannot keep up 
with the producer
----
2019-05-31 02:31:02 UTC - David Kjerrumgaard: if it is a function try upping 
the --parallelism config
----
2019-05-31 02:31:38 UTC - Devin G. Bost: We've been doing that for the last 
couple of hours...
----
2019-05-31 02:32:09 UTC - Thor Sigurjonsson: Part of it that is that we seem to 
loose functions and we are getting functions not starting too... We set 
parallelism to 8 and get 3, etc.
----
2019-05-31 02:32:09 UTC - David Kjerrumgaard: Do you have auto-ack configured?
----
2019-05-31 02:32:23 UTC - Devin G. Bost: We need to check.
----
2019-05-31 02:32:59 UTC - Devin G. Bost: Yes, it's set to true.
----
2019-05-31 02:33:51 UTC - David Kjerrumgaard: That's good.  I would also check 
the message out rate to see if the trend increases when you add consumers or 
not.
----
2019-05-31 02:34:29 UTC - David Kjerrumgaard: What is the subscription type? 
Hopefully not exclusive
----
2019-05-31 02:34:37 UTC - Thor Sigurjonsson: Yes, we've seen backlogs go down 
(to zero even) but the flow gets clogged again.
----
2019-05-31 02:34:50 UTC - Thor Sigurjonsson: default subscription type
----
2019-05-31 02:34:57 UTC - David Kjerrumgaard: so you have a "bursty" data flow?
----
2019-05-31 02:35:14 UTC - Devin G. Bost: It's like playing "whack-a-mole"
----
2019-05-31 02:35:18 UTC - Devin G. Bost: as we try getting things up.
----
2019-05-31 02:35:21 UTC - Thor Sigurjonsson: We had the system go clogged and 
starting it is the problem
----
2019-05-31 02:35:34 UTC - Thor Sigurjonsson: We lost a sink datastore and had 
churn on the servers in the night.
----
2019-05-31 02:35:42 UTC - Devin G. Bost: It started with a Redis sink out of 
memory error.
----
2019-05-31 02:36:08 UTC - David Kjerrumgaard: haha, yea fixing back-pressure is 
like plugging leaks in a pipe that has 100 leaks.  fix one and the other 99 get 
worse
----
2019-05-31 02:36:15 UTC - Thor Sigurjonsson: It seems we've gotten it started 
with just working through the flows one function at a time...
----
2019-05-31 02:36:22 UTC - Thor Sigurjonsson: yes
----
2019-05-31 02:36:30 UTC - Devin G. Bost: But then it dies again.
----
2019-05-31 02:36:55 UTC - David Kjerrumgaard: the Redis connector dies again?
----
2019-05-31 02:37:33 UTC - Thor Sigurjonsson: redis is happy but we're not sure 
-- seems the connector (functions, source) etc are all doing their own thing.
----
2019-05-31 02:37:45 UTC - Thor Sigurjonsson: source gets quota'd out after a 
while
----

Reply via email to