gunli commented on code in PR #1071:
URL: https://github.com/apache/pulsar-client-go/pull/1071#discussion_r1276240567


##########
pulsar/producer_partition.go:
##########
@@ -1121,78 +958,301 @@ func (p *partitionProducer) SendAsync(ctx 
context.Context, msg *ProducerMessage,
        p.internalSendAsync(ctx, msg, callback, false)
 }
 
-func (p *partitionProducer) internalSendAsync(ctx context.Context, msg 
*ProducerMessage,
-       callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
+func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
        if msg == nil {
-               p.log.Error("Message is nil")
-               runCallback(callback, nil, msg, newError(InvalidMessage, 
"Message is nil"))
-               return
+               return newError(InvalidMessage, "Message is nil")
        }
 
        if msg.Value != nil && msg.Payload != nil {
-               p.log.Error("Can not set Value and Payload both")
-               runCallback(callback, nil, msg, newError(InvalidMessage, "Can 
not set Value and Payload both"))
-               return
+               return newError(InvalidMessage, "Can not set Value and Payload 
both")
        }
 
-       // Register transaction operation to transaction and the transaction 
coordinator.
-       var newCallback func(MessageID, *ProducerMessage, error)
-       var txn *transaction
-       if msg.Transaction != nil {
-               transactionImpl := (msg.Transaction).(*transaction)
-               txn = transactionImpl
-               if transactionImpl.state != TxnOpen {
-                       p.log.WithField("state", 
transactionImpl.state).Error("Failed to send message" +
-                               " by a non-open transaction.")
-                       runCallback(callback, nil, msg, newError(InvalidStatus, 
"Failed to send message by a non-open transaction."))
-                       return
+       if p.options.DisableMultiSchema {
+               if msg.Schema != nil && p.options.Schema != nil &&
+                       msg.Schema.GetSchemaInfo().hash() != 
p.options.Schema.GetSchemaInfo().hash() {
+                       p.log.Errorf("The producer %s of the topic %s is 
disabled the `MultiSchema`", p.producerName, p.topic)
+                       return fmt.Errorf("msg schema can not match with 
producer schema")
                }
+       }
 
-               if err := transactionImpl.registerProducerTopic(p.topic); err 
!= nil {
-                       runCallback(callback, nil, msg, err)
-                       return
+       return nil
+}
+
+func (p *partitionProducer) updateSchema(sr *sendRequest) error {
+       var schema Schema
+       var schemaVersion []byte
+       var err error
+
+       if sr.msg.Schema != nil {
+               schema = sr.msg.Schema
+       } else if p.options.Schema != nil {
+               schema = p.options.Schema
+       }
+
+       if schema == nil {
+               return nil
+       }
+
+       schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
+       if schemaVersion == nil {
+               schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
+               if err != nil {
+                       return fmt.Errorf("get schema version fail, err: %w", 
err)
                }
-               if err := transactionImpl.registerSendOrAckOp(); err != nil {
-                       runCallback(callback, nil, msg, err)
-                       return
+               p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
+       }
+
+       sr.schema = schema
+       sr.schemaVersion = schemaVersion
+       return nil
+}
+
+func (p *partitionProducer) updateUncompressPayload(sr *sendRequest) error {

Review Comment:
   OK, I will rename it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to