2019-03-03 09:35:58 UTC - Ben S: @Ben S has joined the channel ---- 2019-03-03 09:40:02 UTC - Ben S: Hey all! ---- 2019-03-03 09:40:36 UTC - Ali Ahmed: Hello ---- 2019-03-03 09:44:16 UTC - Ben S: I have a problem where I‘m currently stuck. Consider the following example:
- There are tens of thousands of shops around the world where internal processes should be recorded - For each shop location the order of internal process tasks must be preserved - There is no global list of shops. Process tasks may fly in from one location, without previous knowledge about that location - Recording the process tasks is critical, no task must be missed (failover) My understanding is that in order to preserve the order of processes within each location, I need to create a topic for each store location like this: <persistent://public/default/store-san-francisco> And for each topic I need to create an exclusive consumer with failovers. The problem is that since I do not know the locations in advance, what is the best way to create and destroy consumers ad-hoc for each unknown location? ---- 2019-03-03 09:46:42 UTC - Ali Ahmed: you can create a consumer the same time you start a producer with a new topic ---- 2019-03-03 09:55:14 UTC - Ben S: Oh, I realize I oversimplified my example. My actual case looks more complex, and different. I need to add one more point to the example: - There are multiple processes. Ordering must be kept within each process. But in order for the system to scale, it is important that individual process steps can be parallelized. ---- 2019-03-03 09:57:35 UTC - Ali Ahmed: are you trying to replicate a traditional job queue with arbitrary num of workers ? ---- 2019-03-03 09:59:47 UTC - Ben S: In some way, it is a combination of a tiny job queue with two consecutive tasks, but it‘s part of a real time event system ---- 2019-03-03 10:01:30 UTC - Ali Ahmed: you should be able to accomplish it with <https://pulsar.apache.org/docs/latest/functions/overview/> ---- 2019-03-03 10:01:35 UTC - Ali Ahmed: <https://streaml.io/blog/eda-simple-event-processing> ---- 2019-03-03 11:39:29 UTC - Ben S: Thank you, this looks interesting ---- 2019-03-03 11:46:11 UTC - Yuvaraj Loganathan: Hi Team, ```import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.pulsar.client.api.*; import schematest1.User; import java.io.IOException; public class SchemaConsumer { public static void main(String[] args) { PulsarClient client = null; Consumer<User> consumer = null; try { ReflectDatumReader<User> reader = new ReflectDatumReader<>(User.getClassSchema()); client = PulsarClient.builder() .serviceUrl("<pulsar://pulsar.stage.xx.xx:6650>") .build(); consumer = client.newConsumer(Schema.AVRO(User.class)) .topic("<persistent://public/default/schema-test1>") .subscriptionType(SubscriptionType.Failover) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionName("test1") .subscribe(); Message<User> msg = consumer.receive(); consumer.close(); client.close(); } catch (PulsarClientException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } ``` How Can I Deserialize msg to an User Object ? ---- 2019-03-03 11:48:35 UTC - Ali Ahmed: ```msg.getValue()``` ---- 2019-03-03 11:50:45 UTC - Yuvaraj Loganathan: @Ali Ahmed Thanks! Will raise an pull request to Doc! :slightly_smiling_face: +1 : Sijie Guo ---- 2019-03-03 15:40:03 UTC - Yuvaraj Loganathan: Hi Team , ``` import pulsar from pulsar import ConsumerType, MessageId from pulsar.schema import Record, String, Integer, AvroSchema from schematest1.User import User client = pulsar.Client('<pulsar://pulsar.stage.xx.xx:6650>') consumer = client.subscribe('<persistent://public/default/schema-test1>', 'my-subscription1',consumer_type=ConsumerType.Failover,schema=AvroSchema(User)) consumer.seek(MessageId.earliest) while True: msg = consumer.receive() print(msg.data()) consumer.close() client.close() ``` Trying to consume the AVRO Message from topic produced by JAVA Producer. Receiving Exception: Pulsar error: IncompatibleSchema Python Schema ``` from pulsar.schema import Record, String, Integer, AvroSchema class User(Record): name = String() favorite_number = Integer() favorite_color = String() age = Integer(default=18) ``` Here is AVRO Schema ``` { "namespace": "schematest1", "type": "record", "name": "User", "fields": [ { "name": "name", "type": "string" }, { "name": "favorite_number", "type": [ "int", "null" ] }, { "name": "favorite_color", "type": "string" }, { "name": "age", "type": "int", "default": 18 } ] } ``` ---- 2019-03-03 15:54:16 UTC - Matteo Merli: @Yuvaraj Loganathan Is this schema coming from the Java POJO? ---- 2019-03-03 15:56:49 UTC - Matteo Merli: the diff that can probably be affecting this are that `favorite_color` and `age` have not “null” type option. Can you try with `favorite_color = String(required=True)` and same for age ? ---- 2019-03-03 16:55:39 UTC - Yuvaraj Loganathan: Yes ---- 2019-03-03 16:55:58 UTC - Yuvaraj Loganathan: Here is the broker schema get response ```{ "version": 0, "type": "AVRO", "timestamp": 0, "data": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"schematest1\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"favorite_number\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"favorite_color\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"age\",\"type\":\"int\"}]}", "properties": {} }``` for the topic ---- 2019-03-03 16:59:01 UTC - Yuvaraj Loganathan: Still the same response ```from pulsar.schema import Record, String, Integer, AvroSchema class User(Record): name = String(required=True) favorite_number = Integer() favorite_color = String(required=True) age = Integer(default=18,required=True) ``` ---- 2019-03-03 17:01:26 UTC - Yuvaraj Loganathan: is it possible to get the scheme from the message so that we can manually decode the bytes ? ---- 2019-03-03 17:02:28 UTC - Matteo Merli: Yes, `User.schema()` should get you that ---- 2019-03-03 17:05:50 UTC - Matteo Merli: (maybe I misunderstood your question: currently it’s not doing an “auto-deserialize” kind of thing.. ---- 2019-03-03 17:06:00 UTC - Yuvaraj Loganathan: `{'name': 'User', 'type': 'record', 'fields': [{'name': 'age', 'type': 'int'}, {'name': 'favorite_color', 'type': 'string'}, {'name': 'favorite_number', 'type': ['null', 'int']}, {'name': 'name', 'type': 'string'}]}` This is the response ---- 2019-03-03 17:07:05 UTC - Yuvaraj Loganathan: namespace is missing in the `User.schema()` output. Is there an way to define namespace in python ? ---- 2019-03-03 17:09:11 UTC - Matteo Merli: No, but that it’s not breaking the Avro validation. We have tests for these cases between java & python ---- 2019-03-03 17:10:53 UTC - Matteo Merli: eg: <https://github.com/apache/pulsar/blob/e9a5e61f06db9780669b39a96c5c29428334a0fe/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java> <https://github.com/apache/pulsar/blob/master/tests/docker-images/latest-version-image/python-examples/producer_schema.py> ---- 2019-03-03 17:14:42 UTC - Yuvaraj Loganathan: Thanks @Matteo Merli Will go through and come back ---- 2019-03-03 17:15:20 UTC - Guy Feldman: FWIW I also had some issues with incompatible schema messages ---- 2019-03-03 17:16:00 UTC - Matteo Merli: Indeed there might be some unconvered corners :slightly_smiling_face: ---- 2019-03-03 17:16:22 UTC - Guy Feldman: I ended up resolving it by hitting the schema endpoints when creating the topic ---- 2019-03-03 17:16:35 UTC - Matteo Merli: @Yuvaraj Loganathan can you share the Java POJO as well, to create a test case for that? ---- 2019-03-03 17:20:18 UTC - Guy Feldman: @Yuvaraj Loganathan you also may want to add a default value of `null` for the favorite number field ---- 2019-03-03 17:20:31 UTC - Guy Feldman: if you want it to be optional ---- 2019-03-03 17:20:52 UTC - Matteo Merli: that shouldn’t be required, ideally.. ---- 2019-03-03 17:23:17 UTC - Guy Feldman: yeah i guess its not ---- 2019-03-03 17:24:21 UTC - Yuvaraj Loganathan: I have first written the avro schema file and then generated avro schema definitions and given the generated POJO to Pulsar producer. ---- 2019-03-03 17:26:42 UTC - Matteo Merli: the thing is that avro doesn’t tell *why* it’s not compatible :confused: ---- 2019-03-03 17:27:13 UTC - Yuvaraj Loganathan: ```from pulsar.schema import Record, String, Integer, AvroSchema class User(Record): name = String() favorite_number = Integer() favorite_color = String() age = Integer(default=18,required=True)``` This is working for me. now Broker Schema Response is ``` { "version": 0, "type": "AVRO", "timestamp": 0, "data": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"schematest1\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"favorite_number\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"favorite_color\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"age\",\"type\":\"int\"}]}", "properties": {} } ``` ---- 2019-03-03 17:28:35 UTC - Yuvaraj Loganathan: `User.schema()` response is ``` {"name": "User", "type": "record", "fields": [{"name": "age", "type": "int"}, {"name": "favorite_color", "type": ["null", "string"]}, {"name": "favorite_number", "type": ["null", "int"]}, {"name": "name", "type": ["null", "string"]}]} ``` ---- 2019-03-03 17:29:17 UTC - Matteo Merli: I guess that when a field has `default=xyz` we could automatically treat it as `required=True`, because the value will always be there? ---- 2019-03-03 17:30:42 UTC - Yuvaraj Loganathan: Now receiving this error ```Traceback (most recent call last): File "/home/uva/PycharmProjects/learn/AvroConsumer.py", line 16, in <module> msg.value() File "/home/uva/yyyy/sources/xxxxx/learn/lib/python3.5/site-packages/pulsar/__init__.py", line 156, in value return self._schema.decode(self._message.data()) File "/home/uva/beam/sources/aws-sc-worker/learn/lib/python3.5/site-packages/pulsar/schema/schema.py", line 103, in decode d = fastavro.schemaless_reader(buffer, self._schema) File "fastavro/_read.pyx", line 763, in fastavro._read.schemaless_reader File "fastavro/_read.pyx", line 773, in fastavro._read.schemaless_reader File "fastavro/_read.pyx", line 564, in fastavro._read._read_data File "fastavro/_read.pyx", line 460, in fastavro._read.read_record File "fastavro/_read.pyx", line 562, in fastavro._read._read_data File "fastavro/_read.pyx", line 435, in fastavro._read.read_union IndexError: list index out of range``` ---- 2019-03-03 17:32:09 UTC - Yuvaraj Loganathan: Yes! ---- 2019-03-03 17:32:38 UTC - Yuvaraj Loganathan: Once I am able to match the broker schema and `User.schema()` Incompatible Schema error is gone. But the above error appears. ---- 2019-03-03 17:33:08 UTC - Matteo Merli: That I haven’t seen yet ---- 2019-03-03 17:33:29 UTC - Matteo Merli: I’ll try with the above schema definitions later ---- 2019-03-03 17:38:52 UTC - Guy Feldman: @Yuvaraj Loganathan i don't know if this is related, but are you using the avro maven plugin ---- 2019-03-03 17:39:01 UTC - Yuvaraj Loganathan: @Guy Feldman Yes ---- 2019-03-03 17:39:14 UTC - Guy Feldman: Did you define a string type configuration ---- 2019-03-03 17:40:08 UTC - Guy Feldman: nm it shouldn't matter ---- 2019-03-03 17:40:15 UTC - Yuvaraj Loganathan: No. ---- 2019-03-03 17:40:29 UTC - Yuvaraj Loganathan: I have generated the POJO using avro maven plugin ---- 2019-03-03 17:40:34 UTC - Guy Feldman: by default the maven plugin uses character arrays ---- 2019-03-03 17:40:45 UTC - Guy Feldman: you have to tell it to use a string object ---- 2019-03-03 17:40:48 UTC - Yuvaraj Loganathan: Ah.. ---- 2019-03-03 17:41:09 UTC - Yuvaraj Loganathan: Also My Schema is ```{ "namespace": "example.avro", "type": "record", "name": "User", "fields": [ { "name": "name", "type": "string" }, { "name": "favorite_number", "type": [ "int", "null" ] }, { "name": "favorite_color", "type": "string" }, { "name": "age", "type": "int", "default": 18 } ] }``` ---- 2019-03-03 17:41:11 UTC - Guy Feldman: but that's serde on the java side. I don't think it should matter ---- 2019-03-03 17:41:26 UTC - Matteo Merli: can you also share the generated Java POJO ? ---- 2019-03-03 17:43:09 UTC - Guy Feldman: gonna try a python consumer with my topics which use avro ---- 2019-03-03 17:48:48 UTC - Yuvaraj Loganathan: @Matteo Merli <https://gist.github.com/skyrocknroll/dcf73dfb1940695d1ad742f069ce9a9f> +1 : Matteo Merli ---- 2019-03-03 17:56:24 UTC - Sijie Guo: there is a problem regarding current avro POJO ---- 2019-03-03 17:59:25 UTC - Sijie Guo: in Java Avro POJO schema, it always convert the fields into Nullable. <https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java#L103> ---- 2019-03-03 18:00:23 UTC - Matteo Merli: Oh, I thought that was Avro default setting ---- 2019-03-03 18:00:28 UTC - Sijie Guo: If you are using a schema where fields doesn’t use union Nullable, you will have incompatible problems. ---- 2019-03-03 18:00:29 UTC - Sijie Guo: I think AllowNull was added by mistake ---- 2019-03-03 18:00:32 UTC - Matteo Merli: that’s why I did the same behavior in Python ---- 2019-03-03 18:02:05 UTC - Sijie Guo: that’s not correct. it is actually causing a lot of confusions. @CongBo or @Penghui Li found the problem and they were saying they will send out a fix for it. ---- 2019-03-03 18:02:26 UTC - Sijie Guo: We should just let Avro handles the schema generation ---- 2019-03-03 18:02:35 UTC - Sijie Guo: we don’t need to add AllowNull ---- 2019-03-03 18:02:36 UTC - Matteo Merli: uhm, the fix will likely be incompatible ---- 2019-03-03 18:02:50 UTC - Sijie Guo: yes ---- 2019-03-03 18:03:05 UTC - Sijie Guo: but its already in a very weird state ---- 2019-03-03 18:03:06 UTC - Sijie Guo: :disappointed: ---- 2019-03-03 18:03:54 UTC - Yuvaraj Loganathan: The same behaviour I am seeing in my schema file and broker registered schema has extra null. ---- 2019-03-03 18:04:14 UTC - Sijie Guo: yes ---- 2019-03-03 18:04:42 UTC - Sijie Guo: the extra null was introduced by `AllowNull`. `AllowNull` converts the schema type into a union type. ---- 2019-03-03 18:04:51 UTC - Sijie Guo: s/schema type/field type ---- 2019-03-03 18:05:59 UTC - Sijie Guo: basically currently there is no way in pulsar’s pojo avro specify non-null fields. ---- 2019-03-03 18:06:15 UTC - Sijie Guo: a safeway to use AvroSchema is to use GenericSchema ---- 2019-03-03 18:06:19 UTC - Yuvaraj Loganathan: Also I do see `default: null` in broker registered schema in all the string fields ---- 2019-03-03 18:06:43 UTC - Sijie Guo: where you can use Avro to build the schema to get around the pojo schema issue ---- 2019-03-03 18:07:27 UTC - Yuvaraj Loganathan: Ok Thanks @Sijie Guo ---- 2019-03-03 18:07:48 UTC - Yuvaraj Loganathan: <https://pastebin.com/NyNcbse0> here is the broker schema for this avro schema <https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1551634869148400> ---- 2019-03-03 18:08:05 UTC - Sijie Guo: @Matteo Merli - for BC, we can rename the AVRO one to AllowNullAvro ---- 2019-03-03 18:08:28 UTC - Sijie Guo: and just introduce a new AVRO schema which doesn’t have AllowNull ---- 2019-03-03 18:08:31 UTC - Sijie Guo: sorry ---- 2019-03-03 18:08:34 UTC - Sijie Guo: keep the current one ---- 2019-03-03 18:08:36 UTC - Matteo Merli: but, in terms of avro auto-generated code, we should extract the same schema as well. So how does one define a “nullable” field in the pojo ---- 2019-03-03 18:08:39 UTC - Sijie Guo: just introduce a new one ---- 2019-03-03 18:09:01 UTC - Matteo Merli: yes, though having 2 is not ideal either :confused: +1 : Yuvaraj Loganathan ---- 2019-03-03 18:09:27 UTC - Sijie Guo: > So how does one define a “nullable” field in the pojo @Matteo Merli: current it is always ‘nullable’ because AllowNull ---- 2019-03-03 18:09:51 UTC - Matteo Merli: maybe an internal versioning of our Avro handling, to allow migrating to new type ---- 2019-03-03 18:10:05 UTC - Sijie Guo: AllowNull is just a nightmare. where @Penghui Li and @CongBo spent quite a lot of time on troubleshooting the issue :slightly_smiling_face: ---- 2019-03-03 18:11:08 UTC - Matteo Merli: (yes, and that combined with no specific errors from avro..) ---- 2019-03-03 18:13:12 UTC - Sijie Guo: @Matteo Merli: I think it might be worth checking with @Jerry Peng if there are other considerations regarding AllowNull. (based what I learned it is not needed) <https://github.com/apache/pulsar/pull/1917> ---- 2019-03-03 18:15:46 UTC - Matteo Merli: Yes ---- 2019-03-03 18:16:35 UTC - Matteo Merli: surely there will be some reason. Let’s check out the options there ---- 2019-03-03 19:43:18 UTC - Guy Feldman: I had trouble serializing a dictionary into this type using the standard avro libary ---- 2019-03-03 19:44:19 UTC - Guy Feldman: default value for age doesn't work ---- 2019-03-03 19:44:33 UTC - Guy Feldman: when I specified an age it worked fine ---- 2019-03-03 19:49:53 UTC - Guy Feldman: I think a more general solution is to have the user provide a class with the schema method implemented. ---- 2019-03-03 19:50:44 UTC - Guy Feldman: This way they can parse avro schema with the python avro library ---- 2019-03-03 19:50:51 UTC - Guy Feldman: or provide their own ---- 2019-03-03 19:51:15 UTC - Matteo Merli: though that won’t guarantee that the data complies with such schema ---- 2019-03-03 19:51:42 UTC - Guy Feldman: ---- 2019-03-03 19:52:18 UTC - Matteo Merli: :slightly_smiling_face: ---- 2019-03-03 19:52:39 UTC - Guy Feldman: alternatively, we can use the python avro library for reading the schemas ---- 2019-03-03 19:53:14 UTC - Matteo Merli: reading the json? ---- 2019-03-03 19:53:22 UTC - Guy Feldman: that's what i'm doing because my schemas also include avro imports ---- 2019-03-03 19:53:48 UTC - Guy Feldman: yeah ---- 2019-03-03 19:53:50 UTC - Guy Feldman: `from avro.schema import SchemaFromJSONData as make_avsc_object` ---- 2019-03-03 19:54:16 UTC - Guy Feldman: then i just use the to_json method to get the json schema ---- 2019-03-04 02:09:30 UTC - Byron: Hi folks. I am observing that the Reader.HasNext method in the Go client can return true even though there isn’t another message available. It may be my misunderstanding of the semantics of the method. ---- 2019-03-04 02:13:06 UTC - Byron: My assumption is that this is a point-in-time lookup of whether the message is the latest and returns false if no more messages have been received and committed by the server. ---- 2019-03-04 08:07:36 UTC - Sijie Guo: +1 we should probably allow user passing in an avro schema :slightly_smiling_face: ---- 2019-03-04 08:09:00 UTC - Sijie Guo: @Matteo Merli @Yuvaraj Loganathan: I created a github issue for the AllowNull problem (<https://github.com/apache/pulsar/issues/3741>) @CongBo @Penghui Li are working on the fix ---- 2019-03-04 08:09:41 UTC - Yuvaraj Loganathan: Awesome! Thank You! ----
