2019-11-11 09:12:37 UTC - leonidv: Thanks! It's exactly what I find! Great!
----
2019-11-11 09:12:50 UTC - leonidv: I'm about subscriptionInitalPosition
----
2019-11-11 09:48:50 UTC - Gopi Krishna: Hi, I am trying to run pulsar using two
different servers. My current scenario is that I will be producing messages
using NiFi, as shown in this blog
(<https://streaml.io/blog/intro-to-nifi-processors>). But the major change in
my case will be that I want to consume messages using pulsar-client in a
different server. I think that this kind of case is possible, but couldn't
proceed any further any help will be highly appreciated.
----
2019-11-11 09:56:43 UTC - Sijie Guo: Once the NIFI processors produce the
messages to a Pulsar topic, you can use a consumer to receive message from that
topic in the other server. Alternatively you can write a Pulsar Function to
process the messages as well.
----
2019-11-11 09:59:40 UTC - Gopi Krishna: Ok, but I have doubt on how the
consumer in the other server know that, should we provide the nifi processor
the url, if so where would I find that and what would that url be specified as ?
----
2019-11-11 10:02:05 UTC - Sijie Guo: you need to provide the pulsar service url
to the consumer you used in the other server.
----
2019-11-11 10:02:35 UTC - Sijie Guo: for how to use a consumer in Java, you can
check : <http://pulsar.apache.org/docs/en/client-libraries-java/>
----
2019-11-11 10:07:05 UTC - tuteng: You can try: from pulsar import Function
from distutils.util import strtobool
from pulsar import SerDe
from pulsar.schema import (
Record, String, Integer, Boolean
)
class Message(Record):
a = String()
b = Integer()
c = Boolean()
def __init__(self, a, b, c):
self.a = a
self.b = int(b)
self.c = bool(strtobool(c))
class MessageSerde(SerDe):
def __init__(self):
pass
def serialize(self, input):
return bytes("{0},{1},{2}".format(
input.a, input.b, input.c))
def deserialize(self, input_bytes):
components = str(input_bytes).split(",")
return Message(*components)
class PulsarFunction(Function):
def __init__(self):
pass
def process(self, input, context):
logger = context.get_logger()
<http://logger.info|logger.info>("----- got message -----")
<http://logger.info|logger.info>("%s" % ", ".join(input.a, input.b,
input.c))
return "/".join(input.a, input.b, input.c)
+1 : Jasper Li
----
2019-11-11 10:07:15 UTC - tuteng: reference:
<https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/custom_object_function.py>
+1 : Jasper Li
----
2019-11-11 10:07:55 UTC - tuteng: This is incorrect
<http://pulsar.apache.org/docs/en/functions-develop/#example>, we will fix it
+1 : Jasper Li
----
2019-11-11 10:09:32 UTC - tuteng: Messages should come from the input topic in
serialize, not be assigned at the beginning of initialization
+1 : Jasper Li
----
2019-11-11 10:37:11 UTC - Naveen Kumar: @Jasper Li Okay. I'll check by that
process.
----
2019-11-11 11:50:33 UTC - Kevin.Chen: @Kevin.Chen has joined the channel
----
2019-11-11 11:52:39 UTC - Naveen Kumar: @tuteng ./pulsar-admin sources create
--source-config-file ./user_conf/debezium-postgres-topic.yml
----
2019-11-11 11:53:37 UTC - Naveen Kumar: connector source config file
----
2019-11-11 11:59:21 UTC - Naveen Kumar: @Jasper Li I had some trouble adding
<https://mvnrepository.com/artifact/io.confluent/kafka-connect-avro-converter/5.2.1>
as maven dependency bcoz it's located in ICM repo (specified in the Note
section), but it's also not available in ICM repo too.
So I downloaded the avro converter from confluent hub, extracted the zip,
copied only the kafka-connect-avro-converter-5.3.1.jar to pulsar directory and
specified dependency as
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>5.2.1</version>
<scope>system</scope>
<systemPath>${project.basedir}/local-maven-repo/kafka-connect-avro-converter-5.3.1.jar</systemPath>
</dependency>
But still the generated nar file doesn't contain the above jar.
----
2019-11-11 12:01:59 UTC - Naveen Kumar: The dependency is added in pom.xml of
pulsar-io-debezium-postgres module
----
2019-11-11 12:49:30 UTC - Jasper Li: @tuteng Got it finally! Cool to have
that!!! Thank you very much!
----
2019-11-11 13:25:24 UTC - Naveen Kumar: IGNORE above problems related to pom.xml
----
2019-11-11 14:49:58 UTC - Pedro Cardoso: Hello, can pulsar functions access the
BookKeeper's ledger or stream APIs?
----
2019-11-11 14:52:06 UTC - Britt Bolen: one must also setup a keystore for the
zts client in order for it to use the truststore. I just created an empty
keystore.
----
2019-11-11 14:52:36 UTC - Retardust: Is there any out of the box way to send
acks async but with ordering guaranties?
----
2019-11-11 14:56:55 UTC - Jasper Li: @Naveen Kumar I am not sure if I am
correct or not since it is done by my colleague and I will confirm it tomorrow,
but it should be added in here:
<https://github.com/apache/pulsar/blob/branch-2.4/pulsar-io/debezium/mysql/pom.xml>
if I got it correctly and you can build a nar with the confluent converter for
pulsar (but we have got other issue after we built it, since
`io.confluent.connect.avro.AvroConverter` requires confluent schema registry
and we can only send message to that schema registry at this stage. :persevere:
----
2019-11-11 15:26:37 UTC - Pedro Cardoso: What is the difference between
defining a Pulsar Schema as `JSONSchema.of(User.class)` vs
`Schema.JSON(User.class);` ?
----
2019-11-11 16:32:14 UTC - Devin G. Bost: Is there a way to create redundant
topics? We have recurring issues where a topic will randomly “freeze,” which
will result in backlog accumulation and downstream functions receiving no data.
The only resolution is to restart the broker where the frozen topic is living.
----
2019-11-11 16:32:52 UTC - Devin G. Bost: We’ve been having this same issue for
months, and we still haven’t received much guidance on it.
----
2019-11-11 16:34:58 UTC - Addison Higham: that ruby client is very out of date,
there is a WIP new ruby client based on C++ here:
<https://github.com/instructure/pulsar-client-ruby>
----
2019-11-11 16:35:21 UTC - Addison Higham: it is being contributed back to
pulsar, it works for basic use cases ATM
----
2019-11-11 17:07:14 UTC - Nathan Mills: @Nathan Mills has joined the channel
----
2019-11-11 18:31:36 UTC - Sijie Guo: Freeze means stopping consuming? What
version of broker are you using?
----
2019-11-11 18:32:24 UTC - Sijie Guo: Can you explain a bit of the idea of
redundant topics?
----
2019-11-11 18:33:01 UTC - Sijie Guo: Order of acks?
----
2019-11-11 18:33:17 UTC - Sijie Guo: They are effectively same
----
2019-11-11 18:33:57 UTC - Sijie Guo: Current no yet. We can consider exposing
it if there are use cases.
----
2019-11-11 18:35:41 UTC - Pedro Cardoso: I need access to the ledger API in
order to perform sliding window operations (with a per-event slide) which is
currently not possible. Is connecting to the underlying bookkeeper cluster when
creating a pulsar function feasible?
----
2019-11-11 18:41:30 UTC - Matteo Merli: Oh that's really out of my Athenz
experience depth. Pinging @Nozomi Kurihara or @Masahiro Sakamoto since they're
actually using Pulsar with Athenz
----
2019-11-11 18:50:29 UTC - Joshua Dunham: Hi Everyone, I'm creating some
relatively custom containers for Pulsar. Since Pulsar uses bookkeeper over
ports will it kill performance if I have separate containers for each?
----
2019-11-11 18:51:51 UTC - Matteo Merli: No, that's the normal deployment
option. A pulsar broker will typically send the data to multiple bookkeeper
nodes anyway
----
2019-11-11 18:57:01 UTC - Joshua Dunham: Sounds great. Any value in stripping
out the libs for the other apps / services?
----
2019-11-11 18:59:37 UTC - Matteo Merli: For example?
----
2019-11-11 19:38:53 UTC - Retardust: Yep. Cause if i will ack 2nd message and
fail to ack first then pulsar will send me first again or not? In failover
mode. From initial sequence of a, b messages i will have b, a, b in downstream
topic or not?
----
2019-11-11 20:03:25 UTC - Joshua Dunham: libs ~
org.apache.bookkeeper-stream-storage-java-client-base-4.9.2.jar
----
2019-11-11 20:03:45 UTC - Joshua Dunham: ok, maybe not the bookkeeper client.
:smile:
----
2019-11-11 20:05:30 UTC - Joshua Dunham: The libs seem like they support
running the various services which can be activated by setting the pulsar
'mode' (./bin/pulsar broker vs. ./bin/pulsar bookie vs. etc)
----
2019-11-11 21:31:31 UTC - Joshua Dunham: Hey Everyone,
----
2019-11-11 21:32:15 UTC - Joshua Dunham: If I have a highly nested URL for
producers /1/2/3/4/5, how in the namespace calculated? Is it /1/ or everything
before the topic?
----
2019-11-11 21:34:07 UTC - Matteo Merli: the topic itself cannot contain `/`
characters
----
2019-11-11 21:34:33 UTC - Joshua Dunham: In my long example what would the
tenant, namespace, and topic be?
----
2019-11-11 21:34:35 UTC - Matteo Merli: so, the form will always be:
`<persistent://TENANT/NAMESPACE/TOPIC>`
----
2019-11-11 21:34:56 UTC - Matteo Merli: when passing a namespace name, you'd
need to pass `TENANT/NAMESPACE`
----
2019-11-11 21:35:47 UTC - Joshua Dunham: I adapted the python example and this
works.
----
2019-11-11 21:35:51 UTC - Joshua Dunham: producer =
client.create_producer('persistent://1/2/3/4/5/6/7')
----
2019-11-11 21:36:11 UTC - Joshua Dunham: (I've replaced numbers with some other
hierarchal info)
----
2019-11-11 21:36:43 UTC - Joshua Dunham: But of course there are different
management tools (auth etc) that operate on the different levels.
----
2019-11-11 21:36:56 UTC - Joshua Dunham: I'm just not sure where Pulsar draws
the lines if I provide so many.
----
2019-11-11 21:39:46 UTC - Matteo Merli: So, there's a bit of a long story
:slightly_smiling_face:
Before Pulsar 2.0, we had naming like:
`<persistent://TENANT/CLUSTER/NAMESPACE/TOPIC>`
We removed cluster from the naming, but now we had to disallow `/` characters
(which were previously allowed in v1.x) because we wouldn't be able to
distinguish between old 1.x naming vs 2.x naming. (and we need to keep
supporting the old topic names for existing deployments).
----
2019-11-11 21:40:07 UTC - Matteo Merli: In your case,
`persistent://1/2/3/4/5/6/7` will be interpreted as :
----
2019-11-11 21:40:32 UTC - Matteo Merli: tenant: `1`
cluster: `2`
namespace `1/2/3`
----
2019-11-11 21:40:42 UTC - Matteo Merli: Meaning a v1.x style topic
----
2019-11-11 21:42:41 UTC - Joshua Dunham: hmm
----
2019-11-11 21:44:45 UTC - Joshua Dunham: I'll read more on these and the
related management tools to see how it would fit my usecase using less
hierarchical elements.
----
2019-11-11 21:45:04 UTC - Joshua Dunham: I don't (think) I need the concept of
tenants.
----
2019-11-11 21:45:48 UTC - Joshua Dunham: What I was going for is a sort of
narrowing scope until some midpoint as a unique domain and then expanding on
that for exact topics (to support schemas etc).
----
2019-11-11 21:46:36 UTC - Joshua Dunham: so like
/base/applications/app1/{app1_logging, app1_metrics, app1_current_task}
----
2019-11-11 21:46:57 UTC - Joshua Dunham: If you consider the { } as brace
expansion.
----
2019-11-11 21:47:35 UTC - Joshua Dunham: I would have multiple applications (as
producers) commiting messages to each 'sub-topic' (with set schemas).
----
2019-11-11 21:47:54 UTC - Joshua Dunham: Do you know of docs on writeups for
this sort of strategy?
----
2019-11-11 22:07:25 UTC - Devin G. Bost: We’re running 4.2.0 on our brokers.
“Freeze” means that all consumers stop getting output from the topics on the
particular broker involved.
I’m just trying to find a workaround so that when this happens, we have a way
to ensure that we don’t have outages.
----
2019-11-11 23:34:44 UTC - Derek Rhodehamel: @Derek Rhodehamel has joined the
channel
----
2019-11-11 23:42:30 UTC - Derek Rhodehamel: Hello,
I'm trying to use the pulsar go client library in an alpine docker container.
The issue is that I do not see a way to install the CPP bindings since an
alpine apk is not mentioned in the docs.
Is there a known way to either build an alpine apk or use one of the existing
builds in an alpine container?
----
2019-11-11 23:44:28 UTC - Ali Ahmed: @Dean Anderson you compile pulsar libs
statically on any linux machine and copy them into the target container
+1 : Derek Rhodehamel
----
2019-11-11 23:51:19 UTC - Matteo Merli: If you're not scared easily, you can
take a look at the WIP in <https://github.com/apache/pulsar-client-go>
----
2019-11-11 23:52:11 UTC - Matteo Merli: basic producer is mostly working,
consumer is getting there
----
2019-11-12 00:25:40 UTC - Sijie Guo: Are you using functions to consume the
messages? Or do you also use Java consumers in your services?
Stopping getting output from the topics looks like a problem we have seen in
our users. It was related to permits issues in pulsar flow control logic. we
still working on how to fix it. but it usually can be fixed by unloading the
topics or namespace. Have you tried unloading before?
----
2019-11-12 00:27:57 UTC - Sijie Guo: Do you want to create ledgers for storing
sliding windows? or you want to access the ledgers of a topic to perform
sliding windows?
If you can provide more details about this use case, I can help figure out the
right approach for you.
----
2019-11-12 00:30:38 UTC - Sijie Guo: > Cause if i will ack 2nd message and
fail to ack first then pulsar will send me first again or not?
If you are using individual acks, it will.
If you are using cummulative acks, it will not.
You don’t need ordering when using individual acks, since broker tracks
individual acks and only redelivers those unacked messages. You also don’t need
ordering when using cumulative acks, eventually it only tracks the highest
watermark (the cursor position).
----
2019-11-12 01:54:34 UTC - Bob Li: @Bob Li has joined the channel
----
2019-11-12 05:55:47 UTC - sundar: My friend has created another issue on github
for the same issue..could you respond there
<https://github.com/apache/pulsar/issues/5619>
----
2019-11-12 05:57:47 UTC - sundar: Can you also tell me how to wipe the existing
pulsar configuration and resetup the whole thing because if we run the metadata
command again it says pulsar cluster already present...but if i run the pulsar
admin command to view the clusters present it does not work so we don't know
what to do...could you please help us? We're college students and we don't know
how to proceed
Thanks
----
2019-11-12 06:01:06 UTC - Raghavi: Hi,
Can someome explain me how to configure avro as the schema type for pulsar IO
with poatgres debezium cdc connector? When i set the "schemaType" property in
connector configuration, I'm getting "AvroTypeException unknown type K". Can
someone help me with a link to documentation/blog post explaining in detail?
----
2019-11-12 06:41:37 UTC - Sijie Guo: @Raghavi Are you planning to consume the
topics produced by cdc connector?
----
2019-11-12 06:41:51 UTC - Sijie Guo: Or what are you planning to do ?
----
2019-11-12 06:44:19 UTC - Sijie Guo: commented in the issue.
----
2019-11-12 06:44:46 UTC - sundar: Thanks will see immediately
----
2019-11-12 06:44:58 UTC - Sijie Guo: you can stop the cluster, delete `data`
directory and re-setup the cluster.
----
2019-11-12 06:59:45 UTC - sundar: We setup the cluster using the initialize
metadata command but when we use the pulsar admin command it is coming like this
----
2019-11-12 07:02:45 UTC - Sijie Guo: you need to setup the whole cluster before
you can use `pulsar-admin`. because pulsar-admin talks to brokers via port 8080
----
2019-11-12 07:05:05 UTC - Raghavi: Our goal is to capture changes from one
postgres source and persist the changes to another postgres destination. This
should be done with defined schema for each table and should have functionality
to upgrade schema whenever there is a change in the source. So, I'm looking to
work with avro schema which is where I'm struck.
----
2019-11-12 07:05:49 UTC - Raghavi: Using Postgres debezium source connector and
jdbc sink connector
----
2019-11-12 07:06:46 UTC - Retardust: But there is a tricky part with negative
ack. For example. I send 3 messages a, b, c downstream async . They batches on
pulsar producer. I have 3 callbacks. I have ack accumulative each and send
negative ack on message sending fail. Could i get in sittuation when a is
acked, b is negative acked during send failed and c is override that negative
ack?
----
2019-11-12 07:11:42 UTC - Sijie Guo: if they are individual acks, I don’t see
there is a problem. they are different messages, no?
----
2019-11-12 07:19:58 UTC - Pradeep Mishra: ok, do we have any plan to add plugin
in fluentd based on this lib?
----
2019-11-12 07:31:42 UTC - sundar: What do you mean by setting up the whole
cluster? Is it setting up :
1. Zookeeper
----
2019-11-12 07:31:51 UTC - sundar: 2. Bookkeeper
3. Broker
----
2019-11-12 07:32:44 UTC - sundar: Also if the DNS is a combination of host ids
like
<pulsar://ip1:port,ip2:port> how will we access this from the client
----