2020-02-20 09:46:20 UTC - Yuvaraj Loganathan: For Jwt You need add header
```Authorization: Bearer xxx```
----
2020-02-20 09:57:11 UTC - Steven Op de beeck: @Steven Op de beeck has joined
the channel
----
2020-02-20 10:21:06 UTC - Steven Op de beeck: Hi all, I'm new to Pulsar. I'm
using the pulsar-io-debezium-postgres connector. I'd like to translate source
data (value) from the debezium connector into plain JSON, avoiding the overhead
of AVRO schema data. What's the best way to approach this? /cc @jia zhai
a. Define a Pulsar function ?
b. Modify the Pulsar Debezium Connector?
c. Some configuration I'm unaware of?
----
2020-02-20 10:23:24 UTC - jia zhai: @Steven Le Roux debezium should already
support Json type schema, You could change the config yaml file.
----
2020-02-20 10:23:58 UTC - Steven Op de beeck: This is the file I'm using now:
```tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-producer-topic"
archive: "connectors/pulsar-io-debezium-postgres-2.5.0.nar"
parallelism: 1
configs:
database.hostname: "producerdb"
database.port: "5432"
database.user: "debezium"
database.password: "dbz"
database.dbname: "pocdb"
database.server.name: "producerdb"
key.converter.schemas.enable: false
key.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: false
value.converter: "org.apache.kafka.connect.json.JsonConverter"
table.whitelist: "public.employee_update_event"
pulsar.service.url: "<pulsar://pulsar-es:6650>"```
----
2020-02-20 10:24:23 UTC - jia zhai: right, the converter
----
2020-02-20 10:25:03 UTC - jia zhai: And after stored in pulsar, we should use
pular’s keyvalue schema type
----
2020-02-20 10:25:47 UTC - Steven Op de beeck: It still outputs a giant json
with schemas inside.
----
2020-02-20 10:25:54 UTC - jia zhai: @tuteng I recall there is an example code
for how to read data out from pulsar?
----
2020-02-20 10:25:56 UTC - Steven Op de beeck:
```{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"producerdb.public.employee_update_event.Key"},"payload":{"id":388}�{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"int64","optional":true,"field":"aggregate_id"},{"type":"string","optional":true,"field":"aggregate_type"},{"type":"string","optional":true,"field":"content"},{"type":"string","optional":true,"field":"event_type"},{"type":"int64","optional":false,"field":"timestamp"}],"optional":true,"name":"producerdb.public.employee_update_event.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"int64","optional":true,"field":"aggregate_id"},{"type":"string","optional":true,"field":"aggregate_type"},{"type":"string","optional":true,"field":"content"},{"type":"string","optional":true,"field":"event_type"},{"type":"int64","optional":false,"field":"timestamp"}],"optional":true,"name":"producerdb.public.employee_update_event.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"producerdb.public.employee_update_event.Envelope"},"payload":{"before":{"id":388,"aggregate_id":null,"aggregate_type":null,"content":null,"event_type":null,"timestamp":0},"after":null,"source":{"version":"0.10.0.Final","connector":"postgresql","name":"producerdb","ts_ms":1582194291258,"snapshot":"false","db":"pocdb","schema":"public","table":"employee_update_event","txId":885,"lsn":24883520,"xmin":null},"op":"d","ts_ms":1582194291263}}```
----
2020-02-20 10:27:28 UTC - Steven Op de beeck: You're suggesting that I
shouldn't bother with what the json looks like on Pulsar, and use Pulsar key
value to get the values out. And ignore what is actually stored on Pulsar?
----
2020-02-20 10:28:26 UTC - Steven Op de beeck: My current mindset is that we
won't need the AVRO schema, and would like to get rid of the overhead it
generates.
----
2020-02-20 10:31:06 UTC - jia zhai: right, Pulsar KeyValue Schema should be
able to ser/des the messages in pulsar
----
2020-02-20 10:34:09 UTC - Steven Op de beeck:
```value.converter.schemas.enable: false```
This isn't implemented then in the debezium-pulsar connector? Or it doesn't
mean what I think it means.
----
2020-02-20 10:41:05 UTC - Steven Op de beeck: This is how we're currently
consuming Pulsar data:
```private void received(Consumer<byte[]> consumer, Message<byte[]>
message) {
LocalDateTime messageReceivedTimestamp = now();
String messageText = new String(message.getData());
<http://log.info|log.info>("Message ({}) received @ {}, value: {}",
message.getMessageId(), messageReceivedTimestamp, messageText);
writeToPerformanceLog(messageReceivedTimestamp, messageText);
try {
consumer.acknowledge(message);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}```
----
2020-02-20 10:41:32 UTC - Steven Op de beeck: I can do getValue, sure; but I'm
just wondering if I can get rid of the AVRO overhead.
----
2020-02-20 10:43:14 UTC - tuteng: Sorry,
<https://github.com/apache/pulsar/pull/6034> There are some comments on this PR
that I haven't had time to solve. I will fix these problems as soon as possible.
----
2020-02-20 10:50:45 UTC - Steven Op de beeck: Or am I totally wrong about this
all, and the above output example is not AVRO at all, just pure Pulsar?
:fearful: And I'm worried about nothing.
----
2020-02-20 10:56:36 UTC - Steven Op de beeck: In that assumption, the question
is: how do we consume what the debezium-connector produces. What's the consumer
schema?
``` pulsarClient
.newConsumer(???)
.topic(topic)
.messageListener(this::received)
.subscribe();```
----
2020-02-20 12:25:23 UTC - Steven Op de beeck: We've verified that the KeyValue
payload on Pulsar is Debezium-Connector specific, not a Pulsar technicality.
Puzzled how to decode it on the consumer end.
----
2020-02-20 12:46:07 UTC - Miroslav Prymek: Could anyone please show me a valid
protobuf schema in JSON format suitable for `pulsar-admin schemas upload` ?
I'm trying things like
```{
"properties": {},
"schema": "syntax = \"proto2\";\n\npackage proto;\n\noption java_package =
\"fi.hsl.common.mqtt.proto\";\noption java_outer_classname =
\"Mqtt\";\n\nmessage RawMessage {\n required int32 SchemaVersion = 1
[default = 1];\n optional string topic = 2;\n optional bytes payload =
3;\n}\n",
"type": "PROTOBUF"
}```
but without luck.
The point is, I want to consume protobuf-serialized messages from
<https://github.com/HSLdevcom/mqtt-pulsar-gateway> in Java pulsar function. I
can decode protobuf by hand but this is not very clean solution...
----
2020-02-20 12:47:08 UTC - Miroslav Prymek: BTW, it's very unfortunate that
`schemas upload` returns just
```HTTP 500 Internal Server Error
Reason: HTTP 500 Internal Server Error```
when the schema file is not valid :disappointed:
----
2020-02-20 13:20:51 UTC - rani: @rani has joined the channel
----
2020-02-20 13:43:21 UTC - Steve Kim: I apologize if this question has already
been asked. (I searched in Slack history and did not find any relevant previous
threads.) Is there a plan to start publishing the Python client as a Conda
package
(<https://docs.conda.io/projects/conda/en/latest/user-guide/concepts/packages.html>)
that is distributed through a conda channel (e.g. <https://conda-forge.org>)
in addition to the wheel files at <https://pypi.org/project/pulsar-client/> ?
----
2020-02-20 14:12:06 UTC - Miroslav Prymek: Ok, I understand it now - the
`schema` property should be Avro JSON even when its serialized using protobuf.
----
2020-02-20 15:15:27 UTC - tcourdy: ah so the functions are stored in the
bookkeeper replicas not the broker replicas?
----
2020-02-20 15:15:50 UTC - tcourdy: so then I should set
`numFunctionPackageReplicas` to the number of bookkeeper replicas that I have?
----
2020-02-20 15:44:16 UTC - Devin G. Bost: If that hasn't been done yet, I think
that's a great idea. Could you please create a feature request in the Pulsar
Github Issues?
----
2020-02-20 16:06:27 UTC - id3a: @id3a has joined the channel
----
2020-02-20 16:09:30 UTC - sathish: @sathish has joined the channel
----
2020-02-20 16:19:37 UTC - Rolf Arne Corneliussen: Thanks for your input, @Antti
Kaikkonen. I have tried to read up on Pulsar, a complex system, but it seems
very different to Kafka. For example, in Kafka the partitions of a topic is
distributed among the members of a consumer group, and the consumer gets
callbacks when a partition is assigned/unassigned to it. I cannot see anything
similar with Pulsar. Even if you use a Key_Shared subscription mode, the
subscriber has no idea of the range of keys is handles (unless you use a stick
hash range, but that need coordination to be fault tolerant). A min-heap on top
of table service with a pluggable 'compare' function would be nice :-)
----
2020-02-20 16:31:56 UTC - Devin G. Bost: In my experience, JSON is usually more
expensive to convert to/from than binary types like Avro. (Sometimes a LOT more
expensive.)
----
2020-02-20 16:32:39 UTC - Devin G. Bost: Also, the schema features of Pulsar
are not dependent on Avro.
----
2020-02-20 16:33:14 UTC - Devin G. Bost: What do you mean specifically by
"overhead"?
----
2020-02-20 17:58:42 UTC - Mikhail Veygman: Hi.
----
2020-02-20 17:59:47 UTC - Mikhail Veygman: When sending async from the producer
it creates a Future<MessageId> object which then completes (I am assuming
through a threadpool in the back ground). Is the number of threads in that
pool configurable?
----
2020-02-20 18:01:18 UTC - Matteo Merli: Yes:
<https://pulsar.apache.org/api/client/2.5.0-SNAPSHOT/org/apache/pulsar/client/api/ClientBuilder.html#ioThreads-int->
----
2020-02-20 18:05:31 UTC - Mikhail Veygman: Oh!. Ok. The sort of documented
feature.
----
2020-02-20 21:20:48 UTC - Antti Kaikkonen: I think that with exclusive/failover
mode a partition is assigned to a single consumer but the problem is that if
the consumer dies then it will be reassigned and there isn't a callback
mechanism that you described.
Concurrent updates might be problematic with a heap.
----
2020-02-20 23:13:13 UTC - Tamer: Is there any recommendation on how to automate
Pulsar function updates from a CI/CD pipeline?
Should I just embed the pulsar-admin cli in my build process?
----
2020-02-21 00:10:14 UTC - Ali Ahmed: @Abhilash Mandaliya
<https://github.com/aahmed-se/pulsar-io-gradle>
----
2020-02-21 00:23:47 UTC - Devin G. Bost: We ran into that issue before. The
admin-cli was really slow for our purposes. It’s better to hit the REST
endpoint directly.
----
2020-02-21 00:23:58 UTC - Devin G. Bost: (Admin REST endpoint.)
----
2020-02-21 01:52:26 UTC - Ken Huang: hi @David Kjerrumgaard, I use your
Dockerfile to run a new pulsar
```docker run -it \
-p 6650:6650 \
-p 8080:8080 \
-p 8081:8081 \
-p 8443:8443 \
-p 6651:6651 \
-p 8888:8888 \
pulsar-in-action:latest```
It works fine, but when I want to grant-permission, I'll get "Authorization is
not enabled Reason: HTTP 501 Not Implemented" error.
What can I do for fixing it? Thank you very much.
----
2020-02-21 01:53:21 UTC - Ken Huang: this is the command for grant-permission
```./bin/pulsar-admin namespaces grant-permission public/default \
--role webapp \
--actions produce,consume```
----
2020-02-21 04:38:51 UTC - Tamer: Thanks @Devin G. Bost, that sound like a good
option. I can either use PulsarAdmin API or just hit the REST http with curl
How did you pass the jar file path?
Do you copy it to worker host (e.g scp)? Or use an http URL of the jar?
----
2020-02-21 04:40:57 UTC - Devin G. Bost: I recommend the REST endpoint directly.
Our first automation path was the CLI. That didn’t last long.
Our second automation path was the PulsarAdmin API. That lasted longer, but it
still was a source of frustration.
Our third automation path was to write a Go consumer that listened to a Pulsar
topic, got our desired data, and hit the REST endpoint directly. That worked
amazingly well and was much faster than all of the others. It’s still running
in production.
----
2020-02-21 04:41:51 UTC - Devin G. Bost: All you need is a URL where Pulsar can
download the Jar, so you can use any cloud storage for that.
----
2020-02-21 04:43:16 UTC - Devin G. Bost: I’ve been working on getting
permission for our team to open source our deployment tool. That effort is
currently in progress.
----
2020-02-21 05:31:57 UTC - Tamer: Thank you so much for the details. I will use
the REST API directly. Maybe later I will consider writing a small wrapper in
rust to have a fast cli.
When you update the function do you just make the `update` api call?
I notices many times when I manually update the function it occasionally fails
with 503 error and I need to delete and recreate the function again to fix it.
Have you ever had this issue that update call fails? Do you stop the function
first?
----
2020-02-21 05:33:34 UTC - Devin G. Bost: Hmm the 500 errors sound familiar.
What version are you running?
----
2020-02-21 05:35:24 UTC - Devin G. Bost: It’s possible that there’s an
unresolved bug with the update process.
It would be extremely helpful if you could please capture log details when that
happens again and create a Github Issue so the bug report can go through triage
and get looked at in detail. (Basically, the way the Pulsar community does
things is that the issue must be reported as a Github Issue before it can be
resolved.)
----
2020-02-21 05:35:38 UTC - Devin G. Bost: The more detail you can provide the
better.
----
2020-02-21 05:36:29 UTC - Devin G. Bost: Yes, I believe we make the `update`
API call. The documentation wasn’t great, so if you need help, I can take a
look at how we’ve done it.
----
2020-02-21 05:44:11 UTC - Tamer: I was running 2.4.2 but now upgraded to 2.5.0,
will test again once I have the CI/CD working
----
2020-02-21 05:44:23 UTC - Devin G. Bost: Sounds good. Thanks.
----
2020-02-21 05:44:30 UTC - Devin G. Bost: Please keep us posted.
----
2020-02-21 05:47:02 UTC - Tamer: Thanks for the detailed answers.
I am actually helping 3 clients at the moment to migrate to Pulsar and all of
them needed this CI/CD for the pulsar function. 2 clients using Jenkins and the
other one gitlab runner.
I will definitely share my updates and will write a blog post at the end.
----
2020-02-21 05:47:29 UTC - Devin G. Bost: Sounds great!
----
2020-02-21 06:38:45 UTC - Pushkar Sawant: @Pushkar Sawant has joined the channel
----
2020-02-21 06:45:56 UTC - Pushkar Sawant: Hi,
Has anyone experienced an issue when an entire Write quorum, has gone down?
What’s the best course of action to recover from such failure?
----
2020-02-21 07:36:40 UTC - tuteng: I think you can try to consume it using
KeyValue schema. `
```byte[] encodeBytes = Schema.KV_BYTES().encode(new KeyValue<>(fooBytes,
barBytes));
KeyValue<byte[], byte[]> decodeKV =
Schema.KV_BYTES().decode(encodeBytes);```
`
----
2020-02-21 07:38:50 UTC - tuteng:
<https://github.com/apache/pulsar/blob/master/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java#L364>
, then convert bytes to json
----
2020-02-21 08:15:12 UTC - Steven Op de beeck: @Devin G. Bost Thanks for your
response. What I mean with overhead is this:
<https://gist.github.com/stevenodb/efa02194d911b185c2e878f0218de736>
While what we are interested in is this small part:
```{
"id": 27,
"aggregate_id": 27,
"aggregate_type": "Employee",
"content": "{\"name\":\"robin\"}",
"event_type": "EmployeeUpdated",
"timestamp": 58256225452041
}```
----
2020-02-21 08:52:44 UTC - Steven Op de beeck: @tuteng Thanks, I will give that
a try.
----