2018-12-16 09:54:52 UTC - zero.xu: @zero.xu has joined the channel
----
2018-12-16 15:02:25 UTC - zero.xu: after read the code, I found the msg will be 
add async inot ledger in PersistentTopic impl, but how the consumer know the 
new msg coming? I can't find any code about this, ashamed about my terrible 
ability. but in NonPersistentTopic impl, the msg just go through 
subscriptions-> subscription -> dispatcher-> consumer->channel. who 
call show me the related code in PersistentTopic?
----
2018-12-16 15:25:38 UTC - jia zhai: Hi @zero.xu Consumer will send a “Flow” 
command to broker, broker will handle this command, and push data to Consumer. 
Please take a look at `handleFlow` command in `ServerCnx.java`, and 
`handleMessage` in ClientCnx.java
----
2018-12-16 15:26:55 UTC - zero.xu: thx!
----
2018-12-16 15:28:31 UTC - jia zhai: welcome
----
2018-12-16 15:29:19 UTC - Sijie Guo: @zero.xu: to add on what @jia zhai 
explained - the consumer is “reading”/“waiting for” entries, while the producer 
is writing the entries and on successfully appending entries, it will add the 
entries to ManagedLedger entries cache, which will then notify the consumers 
waiting for entries.
----
2018-12-16 15:52:14 UTC - zero.xu: I did not found any code aboud the notify 
action, can u show related code?
----
2018-12-16 15:53:07 UTC - zero.xu: I review the code: PersistentTopic 
ManagedLedgerImpl PersistentSubscription, can not found any notify action
----
2018-12-16 15:53:52 UTC - zero.xu: @Sijie Guo
----
2018-12-16 16:03:51 UTC - Matteo Merli: @zero.xu Take a look at 
<https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L413>
----
2018-12-16 16:04:21 UTC - Sijie Guo: @zero.xu check 
ManagedCursorImpl.asyncReadEntriesOrWait
----
2018-12-16 16:05:35 UTC - Matteo Merli: `cursor.asyncReadEntriesOrWait()` is 
the call that will register to get the next batch of messages. if the cursor is 
at the end of topic, it will asynchronosly wait until messages are available
+1 : zero.xu
----
2018-12-16 16:05:35 UTC - Sijie Guo: if a cursor is caught up, it will be added 
to the managed ledger’s `waitingCursors` list. when new entries appened, the 
cursors will be waken up to read the actual emtries from the cache.
----
2018-12-16 16:06:07 UTC - Sijie Guo: yeah @Matteo Merli is faster than me
+1 : zero.xu
----
2018-12-16 16:06:12 UTC - Matteo Merli: :slightly_smiling_face:
----
2018-12-17 00:26:41 UTC - jia zhai: :+1:
----
2018-12-17 01:13:01 UTC - zero.xu: @Matteo Merli @Sijie Guo after read the code 
carefully, I found the clue: PersistentTopic.publishMessage -&gt; 
ManagedLedgerImpl.asyncAddEntry -&gt; OpAddEntry.initiate -&gt; 
LedgerHandle.asyncAddEntry -&gt; OpAddEntry.addComplete+safeRun -&gt; 
ManagedLedgerImpl.notifyCursors -&gt; ManagedCursorImpl.notifyEntriesAvailable 
-&gt; ManagedLedgerImpl.asyncReadEntries -&gt; EntryCacheImpl.asyncReadEntry
----
2018-12-17 01:14:12 UTC - Matteo Merli: Yes, there’s is an optimization there 
to avoid contention between the threads managing the publishers and consumers
----
2018-12-17 01:14:43 UTC - Matteo Merli: With a retry logic to avoid the 
notification when the throughput is high enough
----
2018-12-17 07:07:20 UTC - linxin: @linxin has joined the channel
----
2018-12-17 07:26:17 UTC - linxin: I have read the official documentation on the 
Schema Registry and read the source code. I found that the Pulsar Broker only 
performs the Schema checking when the Producers and Consumers first connect to 
the Broker. The actual write process does not verify. What happens if the 
schema is deleted after the producer connects to the broker?
----
2018-12-17 07:28:39 UTC - linxin: I have read the official documentation on the 
Schema Registry and read the source code. I found that the Pulsar Broker only 
performs the Schema checking when the Producers and Consumers first connect to 
the Broker. The actual write process does not verify. What happens if the 
schema is deleted after the producer connects to the broker? @Sijie Guo
----
2018-12-17 07:42:39 UTC - Matteo Merli: You mean the schema gets created or 
deleted after the producer creation?
----
2018-12-17 07:44:12 UTC - linxin: @Matteo Merli Yes
----
2018-12-17 07:51:58 UTC - Matteo Merli: In case of deletion, I think we’re not 
taking any action at the moment 
----
2018-12-17 07:53:47 UTC - Matteo Merli: For creation, I believe that the topic 
should have been created with type bytes already, therefore it should not let 
you to set the schema at this point since it will be for a different schema type
----
2018-12-17 07:54:16 UTC - Matteo Merli: Just going from memory, I don’t have 
the code in front 
----
2018-12-17 08:10:23 UTC - linxin: @Matteo Merli I understand what you mean, 
thanks. And i have another question, why Pulsar schema is topic dimension, 
unlike Kafka, each message gets a schema id?
----
2018-12-17 08:14:46 UTC - linxin: What happens if the schema makes incompatible 
changes (such as delete a Topic schema and then re-creating one) and then 
register a new consumer with new schema trying to consume the old message with 
old schema?
----
2018-12-17 08:19:42 UTC - linxin: 1. Topic#setSchema(SchemaA); 2. send a 
messageA under SchemaA; 3. delete SchemaA and Topic#setSchema(SchemaB); 4. 
Consumer#subscribeTopicWith(SchemaB); How the consumer handles the messageA? My 
English is not very good. Can you understand what I mean?
----

Reply via email to