Gleiphir2769 commented on code in PR #1071: URL: https://github.com/apache/pulsar-client-go/pull/1071#discussion_r1279402063
########## pulsar/producer_partition.go: ########## @@ -1121,78 +958,301 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, p.internalSendAsync(ctx, msg, callback, false) } -func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, - callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { +func (p *partitionProducer) validateMsg(msg *ProducerMessage) error { if msg == nil { - p.log.Error("Message is nil") - runCallback(callback, nil, msg, newError(InvalidMessage, "Message is nil")) - return + return newError(InvalidMessage, "Message is nil") } if msg.Value != nil && msg.Payload != nil { - p.log.Error("Can not set Value and Payload both") - runCallback(callback, nil, msg, newError(InvalidMessage, "Can not set Value and Payload both")) - return + return newError(InvalidMessage, "Can not set Value and Payload both") } - // Register transaction operation to transaction and the transaction coordinator. - var newCallback func(MessageID, *ProducerMessage, error) - var txn *transaction - if msg.Transaction != nil { - transactionImpl := (msg.Transaction).(*transaction) - txn = transactionImpl - if transactionImpl.state != TxnOpen { - p.log.WithField("state", transactionImpl.state).Error("Failed to send message" + - " by a non-open transaction.") - runCallback(callback, nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction.")) - return + if p.options.DisableMultiSchema { + if msg.Schema != nil && p.options.Schema != nil && + msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() { + p.log.Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic) + return fmt.Errorf("msg schema can not match with producer schema") } + } - if err := transactionImpl.registerProducerTopic(p.topic); err != nil { - runCallback(callback, nil, msg, err) - return + return nil +} + +func (p *partitionProducer) updateSchema(sr *sendRequest) error { + var schema Schema + var schemaVersion []byte + var err error + + if sr.msg.Schema != nil { + schema = sr.msg.Schema + } else if p.options.Schema != nil { + schema = p.options.Schema + } + + if schema == nil { + return nil + } + + schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo()) + if schemaVersion == nil { + schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo()) + if err != nil { + return fmt.Errorf("get schema version fail, err: %w", err) } - if err := transactionImpl.registerSendOrAckOp(); err != nil { - runCallback(callback, nil, msg, err) - return + p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion) + } + + sr.schema = schema + sr.schemaVersion = schemaVersion + return nil +} + +func (p *partitionProducer) updateUncompressPayload(sr *sendRequest) error { + // read payload from message + sr.uncompressedPayload = sr.msg.Payload + + if sr.msg.Value != nil { + if sr.schema == nil { + p.log.Errorf("Schema encode message failed %s", sr.msg.Value) + return newError(SchemaFailure, "set schema value without setting schema") } - newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) { - runCallback(callback, id, producerMessage, err) - transactionImpl.endSendOrAckOp(err) + + // payload and schema are mutually exclusive + // try to get payload from schema value only if payload is not set + schemaPayload, err := sr.schema.Encode(sr.msg.Value) + if err != nil { + p.log.WithError(err).Errorf("Schema encode message failed %s", sr.msg.Value) + return newError(SchemaFailure, err.Error()) } + + sr.uncompressedPayload = schemaPayload + } + + sr.uncompressedSize = int64(len(sr.uncompressedPayload)) + return nil +} + +func (p *partitionProducer) updateMetaData(sr *sendRequest) { + deliverAt := sr.msg.DeliverAt + if sr.msg.DeliverAfter.Nanoseconds() > 0 { + deliverAt = time.Now().Add(sr.msg.DeliverAfter) + } + + sr.mm = p.genMetadata(sr.msg, int(sr.uncompressedSize), deliverAt) + + // set default ReplicationClusters when DisableReplication + if sr.msg.DisableReplication { + sr.msg.ReplicationClusters = []string{"__local__"} + } + + sr.sendAsBatch = !p.options.DisableBatching && + sr.msg.ReplicationClusters == nil && + deliverAt.UnixNano() < 0 + + if !sr.sendAsBatch { + // update sequence id for metadata, make the size of msgMetadata more accurate + // batch sending will update sequence ID in the BatchBuilder + p.updateMetadataSeqID(sr.mm, sr.msg) + } + + sr.deliverAt = deliverAt +} + +func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error { + checkSize := sr.uncompressedSize + if !sr.sendAsBatch { + sr.compressedPayload = p.compressionProvider.Compress(nil, sr.uncompressedPayload) + sr.compressedSize = len(sr.compressedPayload) + + // set the compress type in msgMetaData + compressionType := pb.CompressionType(p.options.CompressionType) + if compressionType != pb.CompressionType_NONE { + sr.mm.Compression = &compressionType + } + + checkSize = int64(sr.compressedSize) + } + + sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize())) Review Comment: > we must drop memLimit and the fixed length pending queue Sorry, I am not get the point why we should drop the memLimit? It's a useful feature for the users who is lack of resources. And I don't think the `fixed length pending queue` will become a problem to `SendAsync`. Why should we make it flexible? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org