2020-02-20 09:46:20 UTC - Yuvaraj Loganathan: For Jwt You need add header
```Authorization: Bearer xxx```
----
2020-02-20 09:57:11 UTC - Steven Op de beeck: @Steven Op de beeck has joined 
the channel
----
2020-02-20 10:21:06 UTC - Steven Op de beeck: Hi all, I'm new to Pulsar. I'm 
using the pulsar-io-debezium-postgres connector. I'd like to translate source 
data (value) from the debezium connector into plain JSON, avoiding the overhead 
of AVRO schema data. What's the best way to approach this? /cc @jia zhai
a. Define a Pulsar function ?
b. Modify the Pulsar Debezium Connector?
c. Some configuration I'm unaware of?
----
2020-02-20 10:23:24 UTC - jia zhai: @Steven Le Roux debezium should already 
support Json type schema, You could change the config yaml file.
----
2020-02-20 10:23:58 UTC - Steven Op de beeck: This is the file I'm using now:
```tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-producer-topic"
archive: "connectors/pulsar-io-debezium-postgres-2.5.0.nar"
parallelism: 1

configs:
  database.hostname: "producerdb"
  database.port: "5432"
  database.user: "debezium"
  database.password: "dbz"
  database.dbname: "pocdb"
  database.server.name: "producerdb"
  key.converter.schemas.enable: false
  key.converter: "org.apache.kafka.connect.json.JsonConverter"
  value.converter.schemas.enable: false
  value.converter: "org.apache.kafka.connect.json.JsonConverter"
  table.whitelist: "public.employee_update_event"
  pulsar.service.url: "<pulsar://pulsar-es:6650>"```
----
2020-02-20 10:24:23 UTC - jia zhai: right, the converter
----
2020-02-20 10:25:03 UTC - jia zhai: And after stored in pulsar,  we should use 
pular’s keyvalue schema type
----
2020-02-20 10:25:47 UTC - Steven Op de beeck: It still outputs a giant json 
with schemas inside.
----
2020-02-20 10:25:54 UTC - jia zhai: @tuteng I recall there is an example code 
for how to read data out from pulsar?
----
2020-02-20 10:25:56 UTC - Steven Op de beeck: 
```{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"producerdb.public.employee_update_event.Key"},"payload":{"id":388}�{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"int64","optional":true,"field":"aggregate_id"},{"type":"string","optional":true,"field":"aggregate_type"},{"type":"string","optional":true,"field":"content"},{"type":"string","optional":true,"field":"event_type"},{"type":"int64","optional":false,"field":"timestamp"}],"optional":true,"name":"producerdb.public.employee_update_event.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"int64","optional":true,"field":"aggregate_id"},{"type":"string","optional":true,"field":"aggregate_type"},{"type":"string","optional":true,"field":"content"},{"type":"string","optional":true,"field":"event_type"},{"type":"int64","optional":false,"field":"timestamp"}],"optional":true,"name":"producerdb.public.employee_update_event.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"producerdb.public.employee_update_event.Envelope"},"payload":{"before":{"id":388,"aggregate_id":null,"aggregate_type":null,"content":null,"event_type":null,"timestamp":0},"after":null,"source":{"version":"0.10.0.Final","connector":"postgresql","name":"producerdb","ts_ms":1582194291258,"snapshot":"false","db":"pocdb","schema":"public","table":"employee_update_event","txId":885,"lsn":24883520,"xmin":null},"op":"d","ts_ms":1582194291263}}```
----
2020-02-20 10:27:28 UTC - Steven Op de beeck: You're suggesting that I 
shouldn't bother with what the json looks like on Pulsar, and use Pulsar key 
value to get the values out. And ignore what is actually stored on Pulsar?
----
2020-02-20 10:28:26 UTC - Steven Op de beeck: My current mindset is that we 
won't need the AVRO schema, and would like to get rid of the overhead it 
generates.
----
2020-02-20 10:31:06 UTC - jia zhai: right, Pulsar KeyValue Schema should be 
able to ser/des the messages in pulsar
----
2020-02-20 10:34:09 UTC - Steven Op de beeck: 
```value.converter.schemas.enable: false```
This isn't implemented then in the debezium-pulsar connector? Or it doesn't 
mean what I think it means.
----
2020-02-20 10:41:05 UTC - Steven Op de beeck: This is how we're currently 
consuming Pulsar data:
```private void received(Consumer&lt;byte[]&gt; consumer, Message&lt;byte[]&gt; 
message) {
        LocalDateTime messageReceivedTimestamp = now();
        String messageText = new String(message.getData());

        <http://log.info|log.info>("Message ({}) received @ {}, value: {}", 
message.getMessageId(), messageReceivedTimestamp, messageText);
        writeToPerformanceLog(messageReceivedTimestamp, messageText);
        try {
            consumer.acknowledge(message);
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }```
----
2020-02-20 10:41:32 UTC - Steven Op de beeck: I can do getValue, sure; but I'm 
just wondering if I can get rid of the AVRO overhead.
----
2020-02-20 10:43:14 UTC - tuteng: Sorry, 
<https://github.com/apache/pulsar/pull/6034> There are some comments on this PR 
that I haven't had time to solve. I will fix these problems as soon as possible.
----
2020-02-20 10:50:45 UTC - Steven Op de beeck: Or am I totally wrong about this 
all, and the above output example is not AVRO at all, just pure Pulsar? 
:fearful: And I'm worried about nothing.
----
2020-02-20 10:56:36 UTC - Steven Op de beeck: In that assumption, the question 
is: how do we consume what the debezium-connector produces. What's the consumer 
schema?
```            pulsarClient
                    .newConsumer(???)
                    .topic(topic)
                     .messageListener(this::received)
                    .subscribe();```
----
2020-02-20 12:25:23 UTC - Steven Op de beeck: We've verified that the KeyValue 
payload on Pulsar is Debezium-Connector specific, not a Pulsar technicality. 
Puzzled how to decode it on the consumer end.
----
2020-02-20 12:46:07 UTC - Miroslav Prymek: Could anyone please show me a valid 
protobuf schema in JSON format suitable for `pulsar-admin schemas upload` ?

I'm trying things like
```{
    "properties": {},
    "schema": "syntax = \"proto2\";\n\npackage proto;\n\noption java_package = 
\"fi.hsl.common.mqtt.proto\";\noption java_outer_classname = 
\"Mqtt\";\n\nmessage RawMessage {\n    required int32 SchemaVersion = 1 
[default = 1];\n    optional string topic = 2;\n    optional bytes payload = 
3;\n}\n",
    "type": "PROTOBUF"
}```
but without luck.

The point is, I want to consume protobuf-serialized messages from 
<https://github.com/HSLdevcom/mqtt-pulsar-gateway> in Java pulsar function. I 
can decode protobuf by hand but this is not very clean solution...
----
2020-02-20 12:47:08 UTC - Miroslav Prymek: BTW, it's very unfortunate that 
`schemas upload`  returns just
```HTTP 500 Internal Server Error

Reason: HTTP 500 Internal Server Error```
when the schema file is not valid :disappointed:
----
2020-02-20 13:20:51 UTC - rani: @rani has joined the channel
----
2020-02-20 13:43:21 UTC - Steve Kim: I apologize if this question has already 
been asked. (I searched in Slack history and did not find any relevant previous 
threads.) Is there a plan to start publishing the Python client as a Conda 
package 
(<https://docs.conda.io/projects/conda/en/latest/user-guide/concepts/packages.html>)
 that is distributed through a conda channel (e.g. <https://conda-forge.org>) 
in addition to the wheel files at <https://pypi.org/project/pulsar-client/> ?
----
2020-02-20 14:12:06 UTC - Miroslav Prymek: Ok, I understand it now - the 
`schema`  property should be Avro JSON even when its serialized using protobuf.
----
2020-02-20 15:15:27 UTC - tcourdy: ah so the functions are stored in the 
bookkeeper replicas not the broker replicas?
----
2020-02-20 15:15:50 UTC - tcourdy: so then I should set 
`numFunctionPackageReplicas` to the number of bookkeeper replicas that I have?
----
2020-02-20 15:44:16 UTC - Devin G. Bost: If that hasn't been done yet, I think 
that's a great idea. Could you please create a feature request in the Pulsar 
Github Issues?
----
2020-02-20 16:06:27 UTC - id3a: @id3a has joined the channel
----
2020-02-20 16:09:30 UTC - sathish: @sathish has joined the channel
----
2020-02-20 16:19:37 UTC - Rolf Arne Corneliussen: Thanks for your input, @Antti 
Kaikkonen. I have tried to read up on Pulsar, a complex system, but it seems 
very different to Kafka. For example, in Kafka the partitions of a topic is 
distributed among the members of a consumer group, and the consumer gets 
callbacks when a partition is assigned/unassigned to it. I cannot see anything 
similar with Pulsar. Even if you use a Key_Shared subscription mode, the 
subscriber has no idea of the range of keys is handles (unless you use a stick 
hash range, but that need coordination to be fault tolerant). A min-heap on top 
of table service with a pluggable 'compare' function would be nice :-)
----
2020-02-20 16:31:56 UTC - Devin G. Bost: In my experience, JSON is usually more 
expensive to convert to/from than binary types like Avro. (Sometimes a LOT more 
expensive.)
----
2020-02-20 16:32:39 UTC - Devin G. Bost: Also, the schema features of Pulsar 
are not dependent on Avro.
----
2020-02-20 16:33:14 UTC - Devin G. Bost: What do you mean specifically by 
"overhead"?
----
2020-02-20 17:58:42 UTC - Mikhail Veygman: Hi.
----
2020-02-20 17:59:47 UTC - Mikhail Veygman: When sending async from the producer 
it creates a Future&lt;MessageId&gt; object which then completes (I am assuming 
through a threadpool in the back ground).  Is the number of threads in that 
pool configurable?
----
2020-02-20 18:01:18 UTC - Matteo Merli: Yes: 
<https://pulsar.apache.org/api/client/2.5.0-SNAPSHOT/org/apache/pulsar/client/api/ClientBuilder.html#ioThreads-int->
----
2020-02-20 18:05:31 UTC - Mikhail Veygman: Oh!.  Ok.  The sort of documented 
feature.
----
2020-02-20 21:20:48 UTC - Antti Kaikkonen: I think that with exclusive/failover 
mode a partition is assigned to a single consumer but the problem is that if 
the consumer dies then it will be reassigned and there isn't a callback 
mechanism that you described.

Concurrent updates might be problematic with a heap.
----
2020-02-20 23:13:13 UTC - Tamer: Is there any recommendation on how to automate 
Pulsar function updates from a CI/CD pipeline?

Should I just embed the pulsar-admin cli in my build process?
----
2020-02-21 00:10:14 UTC - Ali Ahmed: @Abhilash Mandaliya
<https://github.com/aahmed-se/pulsar-io-gradle>
----
2020-02-21 00:23:47 UTC - Devin G. Bost: We ran into that issue before. The 
admin-cli was really slow for our purposes. It’s better to hit the REST 
endpoint directly.
----
2020-02-21 00:23:58 UTC - Devin G. Bost: (Admin REST endpoint.)
----
2020-02-21 01:52:26 UTC - Ken Huang: hi @David Kjerrumgaard, I use your 
Dockerfile to run a new pulsar
```docker run -it \
 -p 6650:6650 \
 -p 8080:8080 \
 -p 8081:8081 \
 -p 8443:8443 \
 -p 6651:6651 \
 -p 8888:8888 \
 pulsar-in-action:latest```
It works fine, but when I want to grant-permission, I'll get "Authorization is 
not enabled   Reason: HTTP 501 Not Implemented" error.

What can I do for fixing it? Thank you very much.
----
2020-02-21 01:53:21 UTC - Ken Huang: this is the command for grant-permission
```./bin/pulsar-admin namespaces grant-permission public/default \
      --role webapp \
      --actions produce,consume```
----
2020-02-21 04:38:51 UTC - Tamer: Thanks @Devin G. Bost, that sound like a good 
option. I can either use PulsarAdmin API or just hit the REST http with curl

How did you pass the jar file path?
Do you copy it to worker host (e.g scp)? Or use an http URL of the jar?
----
2020-02-21 04:40:57 UTC - Devin G. Bost: I recommend the REST endpoint directly.
Our first automation path was the CLI. That didn’t last long.
Our second automation path was the PulsarAdmin API. That lasted longer, but it 
still was a source of frustration.
Our third automation path was to write a Go consumer that listened to a Pulsar 
topic, got our desired data, and hit the REST endpoint directly. That worked 
amazingly well and was much faster than all of the others. It’s still running 
in production.
----
2020-02-21 04:41:51 UTC - Devin G. Bost: All you need is a URL where Pulsar can 
download the Jar, so you can use any cloud storage for that.
----
2020-02-21 04:43:16 UTC - Devin G. Bost: I’ve been working on getting 
permission for our team to open source our deployment tool. That effort is 
currently in progress.
----
2020-02-21 05:31:57 UTC - Tamer: Thank you so much for the details. I will use 
the REST API directly. Maybe later I will consider writing a small wrapper in 
rust to have a fast cli.

When you update the function do you just make the `update` api call? 

I notices many times when I manually update the function it occasionally fails 
with 503 error and I need to delete and recreate the function again to fix it.

Have you ever had this issue that update call fails? Do you stop the function 
first?
----
2020-02-21 05:33:34 UTC - Devin G. Bost: Hmm the 500 errors sound familiar. 
What version are you running?
----
2020-02-21 05:35:24 UTC - Devin G. Bost: It’s possible that there’s an 
unresolved bug with the update process.
It would be extremely helpful if you could please capture log details when that 
happens again and create a Github Issue so the bug report can go through triage 
and get looked at in detail. (Basically, the way the Pulsar community does 
things is that the issue must be reported as a Github Issue before it can be 
resolved.)
----
2020-02-21 05:35:38 UTC - Devin G. Bost: The more detail you can provide the 
better.
----
2020-02-21 05:36:29 UTC - Devin G. Bost: Yes, I believe we make the `update`  
API call. The documentation wasn’t great, so if you need help, I can take a 
look at how we’ve done it.
----
2020-02-21 05:44:11 UTC - Tamer: I was running 2.4.2 but now upgraded to 2.5.0, 
will test again once I have the CI/CD working
----
2020-02-21 05:44:23 UTC - Devin G. Bost: Sounds good. Thanks.
----
2020-02-21 05:44:30 UTC - Devin G. Bost: Please keep us posted.
----
2020-02-21 05:47:02 UTC - Tamer: Thanks for the detailed answers.

I am actually helping 3 clients at the moment to migrate to Pulsar and all of 
them needed this CI/CD for the pulsar function. 2 clients using Jenkins and the 
other one gitlab runner.

I will definitely share my updates and will write a blog post at the end.
----
2020-02-21 05:47:29 UTC - Devin G. Bost: Sounds great!
----
2020-02-21 06:38:45 UTC - Pushkar Sawant: @Pushkar Sawant has joined the channel
----
2020-02-21 06:45:56 UTC - Pushkar Sawant: Hi,
Has anyone experienced an issue when an entire Write quorum, has gone down? 
What’s the best course of action to recover from such failure?
----
2020-02-21 07:36:40 UTC - tuteng: I think you can try to consume it using 
KeyValue schema. `
```byte[] encodeBytes = Schema.KV_BYTES().encode(new KeyValue&lt;&gt;(fooBytes, 
barBytes));
KeyValue&lt;byte[], byte[]&gt; decodeKV = 
Schema.KV_BYTES().decode(encodeBytes);```
`
----
2020-02-21 07:38:50 UTC - tuteng: 
<https://github.com/apache/pulsar/blob/master/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java#L364>
 , then convert bytes to json
----
2020-02-21 08:15:12 UTC - Steven Op de beeck: @Devin G. Bost Thanks for your 
response. What I mean with overhead is this: 
<https://gist.github.com/stevenodb/efa02194d911b185c2e878f0218de736>
While what we are interested in is this small part:
```{
      "id": 27,
      "aggregate_id": 27,
      "aggregate_type": "Employee",
      "content": "{\"name\":\"robin\"}",
      "event_type": "EmployeeUpdated",
      "timestamp": 58256225452041
 }```

----
2020-02-21 08:52:44 UTC - Steven Op de beeck: @tuteng Thanks, I will give that 
a try.
----

Reply via email to