2020-02-18 09:24:57 UTC - Rahul: @Rahul has joined the channel
----
2020-02-18 09:58:56 UTC - Lars Norén: @Lars Norén has joined the channel
----
2020-02-18 11:15:48 UTC - Rolf Arne Corneliussen: @Rolf Arne Corneliussen has 
joined the channel
----
2020-02-18 11:52:23 UTC - Rolf Arne Corneliussen: Hello everyone,

I am new to Pulsar, and I am interested to see how it can solve a specific use 
case - *detection of*
*the absence of messages* from different sources. Let us say we have a 
substantial number of devices
that emits heartbeat messages at certain intervals, translated into a stream of 
heartbeat messages,
and we want to detect missing heartbeats. The devices may have individual 
heartbeat intervals.

Using _Kafka and Kafka Streams_, the problem could be solved using key value 
stores and setting up
_timer wheels_ for the devices belonging to a partition - in memory. However, 
this requires iterating
over the complete key value store when a partition is assigned to a consumer.

I have been looking at the Pulsar Functions API, and while you can get and put 
table entries, I could
not find any way to get all entries (nor the approximate size of a table). From 
the table service
API in BookKeeper, I can see there are methods to get a range, that is, iterate 
over a table given
the partition key. Have I missed something, or are the plans to _expose 
*ranges*_ to the Functions API?

Alternatively, maybe I could try to write a micro service that uses a Pulsar 
client to consume from a topic containing heartbeats,
in combination with using a BookKeeper client to access the table service? 
Would that be a viable solution? Or would it be an anti-pattern (to access 
Pulsar and BookKeeper in the same application)?
Or will the 'transactional' aspects be to complicated?

Any ideas or input are very welcome!
----
2020-02-18 16:51:15 UTC - Sree Vaddi: you all are cordially invited to our 
first meetup at a new company and this year’s first meetup in SF:
raffle prizes
<https://www.meetup.com/Apache-Heron-Bay-Area/events/nglzdrybcdbwb/>
----
2020-02-18 16:52:28 UTC - Sijie Guo: Currently the ranges API is not exposed to 
the functions yet. Feel free to create a GitHub issue for requesting this 
feature.

I don’t think this requires a table or a persistent key/value store to do so. 
You can just maintain an in memory map since heartbeats are bound to time. So 
you just need to keep a time window for it, no?
----
2020-02-18 16:53:27 UTC - Sijie Guo: What command are you using?
----
2020-02-18 17:01:06 UTC - Atif: Thanks for responding sijieg!
----
2020-02-18 17:02:46 UTC - Atif: I could see we have the company banners listed 
in the documentation but would like to understand more about what use cases and 
what volume of data/ business slas/ feature sets these companies are using
----
2020-02-18 17:09:52 UTC - Sijie Guo: @Atif for the use cases, you can check - 
<https://streamnative.io/success-stories/>

for “volume of data/ business slas/ feature sets”, I don’t think most of the 
companies will shall it publicly, especially about “volume of data” and 
“business slas”.
+1 : Atif
----
2020-02-18 17:10:07 UTC - Atif: I can't talk a lot unfortunately about our use 
case, but we're looking to use pulsar at a very large scale 1000-2000 
namespaces and topics. As of now we dont have a huge volume of data but there 
may be spikes in volume and we want to be able to scale up and down 
dynamically. We personally like Pulsar a lot and are inclined to using it but 
the team would like to hear success stories if theyre there
----
2020-02-18 17:10:21 UTC - Atif: thanks
----
2020-02-18 17:10:46 UTC - Atif: this should do!
----
2020-02-18 17:12:34 UTC - Sijie Guo: &gt;  a very large scale 1000-2000 
namespaces and topics.
that’s not a lot. I know a lot of the companies running higher than this scale. 
The data volume is also higher.

The public available numbers that I have seen:

• Yahoo! - millions of topics/partitions, 100 billions of messages / day
• Yahoo! JAPAN - 100 billions of messages / day
• Tencent - 10 billions of messages per day for billing platform.
ok_hand : Atif
----
2020-02-18 17:15:10 UTC - Atif: thanks! this is extremely positive to hear
----
2020-02-18 17:25:45 UTC - Cody Poll: @Cody Poll has joined the channel
+1 : David Kjerrumgaard
----
2020-02-18 17:47:07 UTC - Sijie Guo: No problems. Happy to help! Feel free to 
reach out if you need anything more 
----
2020-02-18 17:58:49 UTC - Cody Poll: Good day :wave:

tl;dr - Does creating/deleting a subscription require writes to zookeeper?
---
I'm trying to understand the ways that Pulsar makes use of Zookeeper. My 
understanding from the documentation is that zookeeper is used for:

1. Information that must be globally consistent (namespaces, tenants, etc.) 
goes in the instance-level zookeeper
2. Cluster-level metadata (broker ownership of topics, load reports, and bookie 
management)
In the "Understanding How Apache Pulsar Works" blog post 
(<https://jack-vanlightly.com/blog/2018/10/2/understanding-how-apache-pulsar-works>),
 it says:
&gt; Each subscription stores a cursor. The cursor is the current offset in the 
log. Subscriptions store their cursor in BookKeeper in ledgers. This makes 
cursor tracking scalable just like topics.
It sounds like there are a number of ways this could be implemented. My first 
reaction is to think that there is some log of cursor updates, meaning 
creating/deleting a subscription wouldn't require metadata writes in Zookeeper, 
only a lookup for the bookies to send it to. Is this correct?
----
2020-02-18 18:10:42 UTC - Addison Higham: @Cody Poll 
<https://streaml.io/blog/cursors-in-pulsar> &lt;- walks through it
----
2020-02-18 18:12:58 UTC - Cody Poll: Thank you :bow:
----
2020-02-18 18:30:28 UTC - Cody Poll: OK - that makes sense.

1. For each subscription, a ledger is created to hold its current offset
2. Ledger location and replication metadata is managed in zookeeper
So creating/deleting a subscription will have metadata effects in zookeeper. 
:thumbsup_all:
+1 : Sijie Guo
----
2020-02-18 20:12:28 UTC - Mikhail Veygman: Hi.
wave : Cody Poll
----
2020-02-18 20:28:19 UTC - Mikhail Veygman: Sorry.  Got stuck.
----
2020-02-18 20:28:41 UTC - Mikhail Veygman: I am doing an async send to a topic 
but I am getting this:

Feb 18, 2020 3:22:55 PM 
<http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.HashedWheelTimer
 reportTooManyInstances
SEVERE: You are creating too many HashedWheelTimer instances. HashedWheelTimer 
is a shared resource that must be reused across the JVM,so that only a few 
instances are created.
----
2020-02-18 20:29:31 UTC - Mikhail Veygman: Is this because I am checking 
whether or not the Future is done?
----
2020-02-18 20:35:26 UTC - Sijie Guo: Are you creating too many clients?
----
2020-02-18 20:35:42 UTC - Mikhail Veygman: Quite possibly
----
2020-02-18 20:35:52 UTC - Mikhail Veygman: Let me try without it.
----
2020-02-18 21:29:29 UTC - Rolf Arne Corneliussen: Thanks for your input,

Interesting thoughts, which led me to read the 'Event Processing Design 
Patterns with Pulsar Functions' blog post on <http://streaml.io|streaml.io> 
website, and also look at some Window functions in the examples on github. 
However, I am not sure this will provide a solution for this specific use case.

So I will try to be more specific about the requirements:
• The input topic contains records that identify the device and the heartbeat 
timestamp
• Each device has an individual configured heartbeat timeout interval, which 
could be anywhere between 10 seconds and 24 hours.
• Each device may emit a heartbeat when it wants to, so it may for example 
produce two heartbeats within the timeout window.
• The number of devices may be 1M  +/-, the total number of heartbeats per 
second could be 10K+
• The function needs to detect missed heartbeats, and produce a message for 
that on an output topic. It should also detect missed heartbeats within one 
second. After detecting a missed heartbeat, the device status is set to 'dead' 
and we should not detect missed heartbeats for dead devices. If we receive a 
heartbeat for a dead device, we should produce a message on an output topic, 
and expect regular heartbeats.
• The function needs to work correctly after a machine/function instance crash, 
or spinning up a new instance of the function
My idea is to store, for each device, the timestamp of the last heartbeat, the 
device status (dead/alive) and the timestamp the device must send the next 
heartbeat (if alive). This is done both in memory and in some persistent way, 
so the process could be resumed in another process, on the same host or another 
host.

Using Kafka Streams, we can build the in memory data structure in the life 
cycle method callbacks (i.e. `init()` when a partition is assigned) by 
iterating over the key value store, and this is very efficient when the store 
is persisted with RocksDB. The structure is maintained by processing
each inbound message, one at the time. Devices that are timed out are detected 
in a _punctuation_ (a scheduled task).

In principle, if the heartbeat topic were compacted, we could create the in 
memory structures by reading the topic from the beginning when a function is 
started, but this would take some time, besides I do not know how to initiate 
that in Pulsar functions.

If the timeout for each device were the same, for example one minute, and if we 
could get a list of device from another source, I can see that one could use a 
time window of one minute to decide which devices had emitted heartbeats and 
which devices where dead (albeit the size of the collection argument to the 
`process` method invocation would be huge?). However, given the above
requirements, it is not clear to me how use a time window?

If access to table ranges were to be implemented for functions, I am not sure 
how to leverage that in an initialization method. Would one have to maintain a 
`boolean` in the
`process` method for example? Could the Function interface be extended with 
life cycle methods?

Also, would it be a good idea to try writing a micro-service that has a Pulsar 
client (to consume heartbeat topic) and a BookKeeper client to access a state 
store?
----
2020-02-18 22:31:41 UTC - Tanner Nilsson: @Tanner Nilsson has joined the channel
----
2020-02-18 22:50:32 UTC - Kirill Podkov: @Kirill Podkov has joined the channel
----
2020-02-19 01:08:45 UTC - Sérgio Silveira: @Sérgio Silveira has joined the 
channel
----
2020-02-19 02:55:56 UTC - Rattanjot Singh: @Rattanjot Singh has joined the 
channel
----
2020-02-19 03:00:40 UTC - Billy Yuan: @Billy Yuan has joined the channel
----
2020-02-19 03:01:15 UTC - Rattanjot Singh: Getting this error when trying to 
produce a message in pulsar broker

Python Code
```import pulsar

# Create a Pulsar client instance. The instance can be shared across multiple
# producers and consumers
client = pulsar.Client('<pulsar://localhost:6650>')

# Create a producer on the topic. If the topic doesn't exist
# it will be automatically created
producer = client.create_producer(
                '<persistent://sample/standalone/ns1/my-topic>')

for i in range(10):
    content = 'hello-pulsar-%d' % i
    # Publish a message and wait until it is persisted
    producer.send(content)
    print('Published message: "%s"' % content)

client.close()```
ERROR
```2020-02-19 08:23:48.021 INFO  ConnectionPool:72 | Created connection for 
<pulsar://localhost:6650>
2020-02-19 08:23:48.024 ERROR ClientConnection:374 | [&lt;none&gt; -&gt; 
<pulsar://localhost:6650>] Failed to establish connection: Connection refused
2020-02-19 08:23:48.024 INFO  ClientConnection:1337 | [&lt;none&gt; -&gt; 
<pulsar://localhost:6650>] Connection closed
2020-02-19 08:23:48.024 ERROR ClientImpl:182 | Error Checking/Getting Partition 
Metadata while creating producer on 
<persistent://sample/standalone/ns1/my-topic> -- 5
2020-02-19 08:23:48.024 INFO  ClientConnection:229 | [&lt;none&gt; -&gt; 
<pulsar://localhost:6650>] Destroyed connection
Traceback (most recent call last):
  File "produce_message.py", line 10, in &lt;module&gt;
    '<persistent://sample/standalone/ns1/my-topic').enableTls(true)>
  File "/usr/local/lib/python3.7/site-packages/pulsar/__init__.py", line 476, 
in create_producer
    p._producer = self._client.create_producer(topic, conf)
Exception: Pulsar error: ConnectError```

----
2020-02-19 05:46:35 UTC - Alexander Ursu: what do the `triage/week-X` issue 
labels mean on the apache/pulsar github?
----
2020-02-19 05:50:04 UTC - Manju Priya A R: What is maximum acknowledgment 
timeout that can be set for Consumer? Tried setting the max possible long value 
"9223372036854775807" and am observing the below exception:   11:18:29.613 
[pulsar-timer-4-1] WARN 
<http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.HashedWheelTimer
 - An exception was thrown by TimerTask.
java.util.NoSuchElementException: null
        at java.util.ArrayDeque.removeFirst(ArrayDeque.java:285)
        at 
org.apache.pulsar.client.impl.UnAckedMessageTracker$2.run(UnAckedMessageTracker.java:130)
        at 
<http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:680)
        at 
<http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:755)
        at 
<http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:483)
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
----
2020-02-19 05:57:11 UTC - Sijie Guo: the label is used for triaging issues. so 
we know there are people looking into this issues at some extends
----
2020-02-19 06:19:52 UTC - Gautam: @Gautam has joined the channel
----
2020-02-19 06:24:45 UTC - Ken Huang: @Ken Huang has joined the channel
----
2020-02-19 06:57:14 UTC - Ken Huang: Hi all, I want to enable TLS in pulsar, I 
reference this <http://pulsar.apache.org/docs/en/security-tls-transport/>
```client = Client("<pulsar+ssl://localhost:6651/>", 
                tls_trust_certs_file_path="/pulsar/my-ca/certs/ca.cert.pem", 
                tls_allow_insecure_connection=False) 
producer = client.create_producer('<persistent://public/default/topic>', 
schema=StringSchema()) ```
I will get the Exception: Pulsar error: ConnectError
I use docker to run pulsar
```docker run -d -it -p 6650:6650 -p 8080:8080 -p 6651:6651 -p 8081:8081 -p 
8443:8443 -v D:/Documents/Docker/pulsar_manager/data:/pulsar/data --name 
pulsar-manager-standalone1 pulsar bin/pulsar standalone ```
How can I do, thanks a lot.
----
2020-02-19 08:02:04 UTC - Ismail Hassan: @Ismail Hassan has joined the channel
----
2020-02-19 08:39:53 UTC - Manuel Mueller: @Manuel Mueller has joined the channel
----

Reply via email to