2019-11-11 09:12:37 UTC - leonidv: Thanks! It's exactly what I find! Great!
----
2019-11-11 09:12:50 UTC - leonidv: I'm about subscriptionInitalPosition
----
2019-11-11 09:48:50 UTC - Gopi Krishna: Hi, I am trying to run pulsar using two 
different servers. My current scenario is that I will be producing messages 
using NiFi, as shown in this blog 
(<https://streaml.io/blog/intro-to-nifi-processors>). But the major change in 
my case will be that I want to consume messages using pulsar-client in a 
different server. I think that this kind of case is possible, but couldn't 
proceed any further any help will be highly appreciated.
----
2019-11-11 09:56:43 UTC - Sijie Guo: Once the NIFI processors produce the 
messages to a Pulsar topic, you can use a consumer to receive message from that 
topic in the other server. Alternatively you can write a Pulsar Function to 
process the messages as well.
----
2019-11-11 09:59:40 UTC - Gopi Krishna: Ok, but I have doubt on how the 
consumer in the other server know that, should we provide the nifi processor 
the url, if so where would I find that and what would that url be specified as ?
----
2019-11-11 10:02:05 UTC - Sijie Guo: you need to provide the pulsar service url 
to the consumer you used in the other server.
----
2019-11-11 10:02:35 UTC - Sijie Guo: for how to use a consumer in Java, you can 
check : <http://pulsar.apache.org/docs/en/client-libraries-java/>
----
2019-11-11 10:07:05 UTC - tuteng: You can try: from pulsar import Function
from distutils.util import strtobool
from pulsar import SerDe
from pulsar.schema import (
    Record, String, Integer, Boolean
)


class Message(Record):
    a = String()
    b = Integer()
    c = Boolean()

    def __init__(self, a, b, c):
        self.a = a
        self.b = int(b)
        self.c = bool(strtobool(c))


class MessageSerde(SerDe):
    def __init__(self):
        pass

    def serialize(self, input):
        return bytes("{0},{1},{2}".format(
            input.a, input.b, input.c))

    def deserialize(self, input_bytes):
        components = str(input_bytes).split(",")
        return Message(*components)


class PulsarFunction(Function):
    def __init__(self):
        pass

    def process(self, input, context):
        logger = context.get_logger()
        <http://logger.info|logger.info>("----- got message -----")
        <http://logger.info|logger.info>("%s" % ", ".join(input.a, input.b, 
input.c))
        return "/".join(input.a, input.b, input.c)
+1 : Jasper Li
----
2019-11-11 10:07:15 UTC - tuteng: reference: 
<https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/custom_object_function.py>
+1 : Jasper Li
----
2019-11-11 10:07:55 UTC - tuteng: This is incorrect 
<http://pulsar.apache.org/docs/en/functions-develop/#example>, we will fix it
+1 : Jasper Li
----
2019-11-11 10:09:32 UTC - tuteng: Messages should come from the input topic in 
serialize, not be assigned at the beginning of initialization
+1 : Jasper Li
----
2019-11-11 10:37:11 UTC - Naveen Kumar: @Jasper Li Okay. I'll check by that 
process.
----
2019-11-11 11:50:33 UTC - Kevin.Chen: @Kevin.Chen has joined the channel
----
2019-11-11 11:52:39 UTC - Naveen Kumar: @tuteng ./pulsar-admin sources create 
--source-config-file ./user_conf/debezium-postgres-topic.yml
----
2019-11-11 11:53:37 UTC - Naveen Kumar: connector source config file
----
2019-11-11 11:59:21 UTC - Naveen Kumar: @Jasper Li I had some trouble adding 
<https://mvnrepository.com/artifact/io.confluent/kafka-connect-avro-converter/5.2.1>
 as maven dependency bcoz it's located in ICM repo (specified in the Note 
section), but it's also not available in ICM repo too.

So I downloaded the avro converter from confluent hub, extracted the zip, 
copied only the kafka-connect-avro-converter-5.3.1.jar to pulsar directory and 
specified dependency as

    &lt;dependency&gt;
      &lt;groupId&gt;io.confluent&lt;/groupId&gt;
      &lt;artifactId&gt;kafka-connect-avro-converter&lt;/artifactId&gt;
      &lt;version&gt;5.2.1&lt;/version&gt;
      &lt;scope&gt;system&lt;/scope&gt;
      
&lt;systemPath&gt;${project.basedir}/local-maven-repo/kafka-connect-avro-converter-5.3.1.jar&lt;/systemPath&gt;
    &lt;/dependency&gt;

But still the generated nar file doesn't contain the above jar.
----
2019-11-11 12:01:59 UTC - Naveen Kumar: The dependency is added in pom.xml of 
pulsar-io-debezium-postgres module
----
2019-11-11 12:49:30 UTC - Jasper Li: @tuteng Got it finally! Cool to have 
that!!! Thank you very much!
----
2019-11-11 13:25:24 UTC - Naveen Kumar: IGNORE above problems related to pom.xml
----
2019-11-11 14:49:58 UTC - Pedro Cardoso: Hello, can pulsar functions access the 
BookKeeper's ledger or stream APIs?
----
2019-11-11 14:52:06 UTC - Britt Bolen: one must also setup a keystore for the 
zts client in order for it to use the truststore.  I just created an empty 
keystore.
----
2019-11-11 14:52:36 UTC - Retardust: Is there any out of the box way to send 
acks async but with ordering guaranties?
----
2019-11-11 14:56:55 UTC - Jasper Li: @Naveen Kumar I am not sure if I am 
correct or not since it is done by my colleague and I will confirm it tomorrow, 
but it should be added in here: 
<https://github.com/apache/pulsar/blob/branch-2.4/pulsar-io/debezium/mysql/pom.xml>
 if I got it correctly and you can build a nar with the confluent converter for 
pulsar (but we have got other issue after we built it, since 
`io.confluent.connect.avro.AvroConverter` requires confluent schema registry 
and we can only send message to that schema registry at this stage. :persevere:
----
2019-11-11 15:26:37 UTC - Pedro Cardoso: What is the difference between 
defining a Pulsar Schema as `JSONSchema.of(User.class)` vs 
`Schema.JSON(User.class);` ?
----
2019-11-11 16:32:14 UTC - Devin G. Bost: Is there a way to create redundant 
topics? We have recurring issues where a topic will randomly “freeze,” which 
will result in backlog accumulation and downstream functions receiving no data. 
The only resolution is to restart the broker where the frozen topic is living.
----
2019-11-11 16:32:52 UTC - Devin G. Bost: We’ve been having this same issue for 
months, and we still haven’t received much guidance on it.
----
2019-11-11 16:34:58 UTC - Addison Higham: that ruby client is very out of date, 
there is a WIP new ruby client based on C++ here: 
<https://github.com/instructure/pulsar-client-ruby>
----
2019-11-11 16:35:21 UTC - Addison Higham: it is being contributed back to 
pulsar, it works for basic use cases ATM
----
2019-11-11 17:07:14 UTC - Nathan Mills: @Nathan Mills has joined the channel
----
2019-11-11 18:31:36 UTC - Sijie Guo: Freeze means stopping consuming? What 
version of broker are you using?
----
2019-11-11 18:32:24 UTC - Sijie Guo: Can you explain a bit of the idea of 
redundant topics?
----
2019-11-11 18:33:01 UTC - Sijie Guo: Order of acks?
----
2019-11-11 18:33:17 UTC - Sijie Guo: They are effectively same 
----
2019-11-11 18:33:57 UTC - Sijie Guo: Current no yet. We can consider exposing 
it if there are use cases.
----
2019-11-11 18:35:41 UTC - Pedro Cardoso: I need access to the ledger API in 
order to perform sliding window operations (with a per-event slide) which is 
currently not possible. Is connecting to the underlying bookkeeper cluster when 
creating a pulsar function feasible?
----
2019-11-11 18:41:30 UTC - Matteo Merli: Oh that's really out of my Athenz 
experience depth. Pinging @Nozomi Kurihara or @Masahiro Sakamoto since they're 
actually using Pulsar with Athenz
----
2019-11-11 18:50:29 UTC - Joshua Dunham: Hi Everyone, I'm creating some 
relatively custom containers for Pulsar. Since Pulsar uses bookkeeper over 
ports will it kill performance if I have separate containers for each?
----
2019-11-11 18:51:51 UTC - Matteo Merli: No, that's the normal deployment 
option. A pulsar broker will typically send the data to multiple bookkeeper 
nodes anyway
----
2019-11-11 18:57:01 UTC - Joshua Dunham: Sounds great. Any value in stripping 
out the libs for the other apps / services?
----
2019-11-11 18:59:37 UTC - Matteo Merli: For example?
----
2019-11-11 19:38:53 UTC - Retardust: Yep. Cause if i will ack 2nd message and 
fail to ack first then pulsar will send me first again or not? In failover 
mode. From initial sequence of a, b messages i will have b, a, b in downstream 
topic or not?
----
2019-11-11 20:03:25 UTC - Joshua Dunham: libs ~ 
org.apache.bookkeeper-stream-storage-java-client-base-4.9.2.jar
----
2019-11-11 20:03:45 UTC - Joshua Dunham: ok, maybe not the bookkeeper client. 
:smile:
----
2019-11-11 20:05:30 UTC - Joshua Dunham: The libs seem like they support 
running the various services which can be activated by setting the pulsar 
'mode' (./bin/pulsar broker vs. ./bin/pulsar bookie vs. etc)
----
2019-11-11 21:31:31 UTC - Joshua Dunham: Hey Everyone,
----
2019-11-11 21:32:15 UTC - Joshua Dunham: If I have a highly nested URL for 
producers /1/2/3/4/5, how in the namespace calculated? Is it /1/ or everything 
before the topic?
----
2019-11-11 21:34:07 UTC - Matteo Merli: the topic itself cannot contain `/` 
characters
----
2019-11-11 21:34:33 UTC - Joshua Dunham: In my long example what would the 
tenant, namespace, and topic be?
----
2019-11-11 21:34:35 UTC - Matteo Merli: so, the form will always be: 
`<persistent://TENANT/NAMESPACE/TOPIC>`
----
2019-11-11 21:34:56 UTC - Matteo Merli: when passing a namespace name, you'd 
need to pass `TENANT/NAMESPACE`
----
2019-11-11 21:35:47 UTC - Joshua Dunham: I adapted the python example and this 
works.
----
2019-11-11 21:35:51 UTC - Joshua Dunham: producer = 
client.create_producer('persistent://1/2/3/4/5/6/7')
----
2019-11-11 21:36:11 UTC - Joshua Dunham: (I've replaced numbers with some other 
hierarchal info)
----
2019-11-11 21:36:43 UTC - Joshua Dunham: But of course there are different 
management tools (auth etc) that operate on the different levels.
----
2019-11-11 21:36:56 UTC - Joshua Dunham: I'm just not sure where Pulsar draws 
the lines if I provide so many.
----
2019-11-11 21:39:46 UTC - Matteo Merli: So, there's a bit of a long story 
:slightly_smiling_face:

Before Pulsar 2.0, we had naming like:

`<persistent://TENANT/CLUSTER/NAMESPACE/TOPIC>`

We removed cluster from the naming, but now we had to disallow `/` characters 
(which were previously allowed in v1.x) because we wouldn't be able to 
distinguish between old 1.x naming vs 2.x naming. (and we need to keep 
supporting the old topic names for existing deployments).
----
2019-11-11 21:40:07 UTC - Matteo Merli: In your case, 
`persistent://1/2/3/4/5/6/7` will be interpreted as :
----
2019-11-11 21:40:32 UTC - Matteo Merli: tenant: `1`
cluster: `2`
namespace `1/2/3`
----
2019-11-11 21:40:42 UTC - Matteo Merli: Meaning a v1.x style topic
----
2019-11-11 21:42:41 UTC - Joshua Dunham: hmm
----
2019-11-11 21:44:45 UTC - Joshua Dunham: I'll read more on these and the 
related management tools to see how it would fit my usecase using less 
hierarchical elements.
----
2019-11-11 21:45:04 UTC - Joshua Dunham: I don't (think) I need the concept of 
tenants.
----
2019-11-11 21:45:48 UTC - Joshua Dunham: What I was going for is a sort of 
narrowing scope until some midpoint as a unique domain and then expanding on 
that for exact topics (to support schemas etc).
----
2019-11-11 21:46:36 UTC - Joshua Dunham: so like 
/base/applications/app1/{app1_logging, app1_metrics, app1_current_task}
----
2019-11-11 21:46:57 UTC - Joshua Dunham: If you consider the { } as brace 
expansion.
----
2019-11-11 21:47:35 UTC - Joshua Dunham: I would have multiple applications (as 
producers) commiting messages to each 'sub-topic' (with set schemas).
----
2019-11-11 21:47:54 UTC - Joshua Dunham: Do you know of docs on writeups for 
this sort of strategy?
----
2019-11-11 22:07:25 UTC - Devin G. Bost: We’re running 4.2.0 on our brokers.
“Freeze” means that all consumers stop getting output from the topics on the 
particular broker involved.
I’m just trying to find a workaround so that when this happens, we have a way 
to ensure that we don’t have outages.
----
2019-11-11 23:34:44 UTC - Derek Rhodehamel: @Derek Rhodehamel has joined the 
channel
----
2019-11-11 23:42:30 UTC - Derek Rhodehamel: Hello,
I'm trying to use the pulsar go client library in an alpine docker container.
The issue is that I do not see a way to install the CPP bindings since an 
alpine apk is not mentioned in the docs.
Is there a known way to either build an alpine apk or use one of the existing 
builds in an alpine container?
----
2019-11-11 23:44:28 UTC - Ali Ahmed: @Dean Anderson you compile pulsar libs 
statically on any linux machine and copy them into the target container
+1 : Derek Rhodehamel
----
2019-11-11 23:51:19 UTC - Matteo Merli: If you're not scared easily, you can 
take a look at the WIP in <https://github.com/apache/pulsar-client-go>
----
2019-11-11 23:52:11 UTC - Matteo Merli: basic producer is mostly working, 
consumer is getting there
----
2019-11-12 00:25:40 UTC - Sijie Guo: Are you using functions to consume the 
messages? Or do you also use Java consumers in your services?

Stopping getting output from the topics looks like a problem we have seen in 
our users. It was related to permits issues in pulsar flow control logic. we 
still working on how to fix it. but it usually can be fixed by unloading the 
topics or namespace. Have you tried unloading before?
----
2019-11-12 00:27:57 UTC - Sijie Guo: Do you want to create ledgers for storing 
sliding windows? or you want to access the ledgers of a topic to perform 
sliding windows?

If you can provide more details about this use case, I can help figure out the 
right approach for you.
----
2019-11-12 00:30:38 UTC - Sijie Guo: &gt; Cause if i will ack 2nd message and 
fail to ack first then pulsar will send me first again or not?

If you are using individual acks, it will.

If you are using cummulative acks, it will not.

You don’t need ordering when using individual acks, since broker tracks 
individual acks and only redelivers those unacked messages. You also don’t need 
ordering when using cumulative acks, eventually it only tracks the highest 
watermark (the cursor position).
----
2019-11-12 01:54:34 UTC - Bob Li: @Bob Li has joined the channel
----
2019-11-12 05:55:47 UTC - sundar: My friend has created another issue on github 
for the same issue..could you respond there 

<https://github.com/apache/pulsar/issues/5619>
----
2019-11-12 05:57:47 UTC - sundar: Can you also tell me how to wipe the existing 
pulsar configuration and resetup the whole thing because if we run the metadata 
command again it says pulsar cluster already present...but if i run the pulsar 
admin command to view the clusters present it does not work so we don't know 
what to do...could you please help us? We're college students and we don't know 
how to proceed
Thanks
----
2019-11-12 06:01:06 UTC - Raghavi: Hi,
Can someome explain me how to configure avro as the schema type for pulsar IO 
with poatgres debezium cdc connector? When i set the "schemaType" property in 
connector configuration, I'm getting "AvroTypeException unknown type K". Can 
someone help me with a link to documentation/blog post explaining in detail?
----
2019-11-12 06:41:37 UTC - Sijie Guo: @Raghavi Are you planning to consume the 
topics produced by cdc connector?
----
2019-11-12 06:41:51 UTC - Sijie Guo: Or what are you planning to do ?
----
2019-11-12 06:44:19 UTC - Sijie Guo: commented in the issue.
----
2019-11-12 06:44:46 UTC - sundar: Thanks will see immediately
----
2019-11-12 06:44:58 UTC - Sijie Guo: you can stop the cluster, delete `data` 
directory and re-setup the cluster.
----
2019-11-12 06:59:45 UTC - sundar: We setup the cluster using the initialize 
metadata command but when we use the pulsar admin command it is coming like this
----
2019-11-12 07:02:45 UTC - Sijie Guo: you need to setup the whole cluster before 
you can use `pulsar-admin`. because pulsar-admin talks to brokers via port 8080
----
2019-11-12 07:05:05 UTC - Raghavi: Our goal is to capture changes from one 
postgres source and persist the changes to another postgres destination. This 
should be done with defined schema for each table and should have functionality 
to upgrade schema whenever there is a change in the source. So, I'm looking to 
work with avro schema which is where I'm struck.
----
2019-11-12 07:05:49 UTC - Raghavi: Using Postgres debezium source connector and 
jdbc sink connector
----
2019-11-12 07:06:46 UTC - Retardust: But there is a tricky part with negative 
ack. For example. I send 3 messages a, b, c downstream async . They batches on 
pulsar producer. I have 3 callbacks. I have ack accumulative each and send 
negative ack on message sending fail. Could i get in sittuation when a is 
acked, b is negative acked during send failed and c is override that negative 
ack?
----
2019-11-12 07:11:42 UTC - Sijie Guo: if they are individual acks, I don’t see 
there is a problem. they are different messages, no?
----
2019-11-12 07:19:58 UTC - Pradeep Mishra: ok, do we have any plan to add plugin 
in fluentd based on this lib?
----
2019-11-12 07:31:42 UTC - sundar: What do you mean by setting up the whole 
cluster? Is it setting up :
1. Zookeeper
----
2019-11-12 07:31:51 UTC - sundar: 2. Bookkeeper
3. Broker
----
2019-11-12 07:32:44 UTC - sundar: Also if the DNS is a combination of host ids 
like
<pulsar://ip1:port,ip2:port> how will we access this from the client
----

Reply via email to