Denovo1998 opened a new issue, #19113:
URL: https://github.com/apache/pulsar/issues/19113

   ### Motivation
   
   Fixed the failure to use schema to create consumer after using AUTO-CONSUME 
consumer to subscribe an empty topic, and Broker returned the error message as 
IncompatibleSchemaException("Topic does not have schema to check").
   
https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1037
   
https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1147-L1152
   
https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3054-L3071
   
https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java#L1162-L1177
   We should record whether the active consumers of the Topic have one or more 
consumers whose SchemaType is not AUTO_CONSUME.
   
   ### Goal
   
   1. On the client side, the AUTO_CONSUME schema should be uploaded to the 
Broker.
   
https://github.com/apache/pulsar/blob/36cff70575745f98c7fb4f3a345d21404f5504fc/pulsar-common/src/main/proto/PulsarApi.proto#L25-L55
   add AutoConsume = -3;
   2. On the broker side, record SchemaType in 
`org.apache.pulsar.broker.service.Consumer`.
   
   ### API Changes
   
   Add SchemaType in Consumer
   ```
           private final SchemaType schemaType;
   ```
   Add AutoConsume = -3; in PulsarApi.proto
   ```
   message Schema {
       enum Type {
           None = 0;
           String = 1;
           Json = 2;
           Protobuf = 3;
           Avro = 4;
           Bool = 5;
           Int8 = 6;
           Int16 = 7;
           Int32 = 8;
           Int64 = 9;
           Float = 10;
           Double = 11;
           Date = 12;
           Time = 13;
           Timestamp = 14;
           KeyValue = 15;
           Instant = 16;
           LocalDate = 17;
           LocalTime = 18;
           LocalDateTime = 19;
           ProtobufNative = 20;
           AutoConsume = -3;
       }
   
       required string name = 1;
       required bytes schema_data = 3;
       required Type type = 4;
       repeated KeyValue properties = 5;
   }
   ```
   In getSchemaType(), AUTO_CONSUM is no longer set to NONE. 
   ```
       private static Schema.Type getSchemaType(SchemaType type) {
           if (type.getValue() < 0 && type.getValue() != -3) {
               return Schema.Type.None;
           } else {
               return Schema.Type.valueOf(type.getValue());
           }
       }
   
       public static SchemaType getSchemaType(Schema.Type type) {
           if (type.getValue() < 0 && type.getValue() != -3) {
               // this is unexpected
               return SchemaType.NONE;
           } else {
               return SchemaType.valueOf(type.getValue());
           }
       }
   ```
   
   ### Implementation
   
   In `org.apache.pulsar.broker.service.ServerCnx#handleSubscribe`
   ```
       SubscriptionOption option = 
SubscriptionOption.builder().cnx(ServerCnx.this)
               .subscriptionName(subscriptionName)
               
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
               .consumerName(consumerName).isDurable(isDurable)
               
.startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted)
               .initialPosition(initialPosition)
               .startMessageRollbackDurationSec(startMessageRollbackDurationSec)
               
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
               .subscriptionProperties(subscriptionProperties)
               .consumerEpoch(consumerEpoch)
               .schemaType(schema == null ? null : schema.getType())
               .build();
       if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
           return topic.addSchemaIfIdleOrCheckCompatible(schema)
                   .thenCompose(v -> topic.subscribe(option));
       } else {
           return topic.subscribe(option);
       }
   ```
   In `org.apache.pulsar.broker.service.persistent.PersistentTopic` and 
`org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic`. Record 
SchemaType in `org.apache.pulsar.broker.service.Consumer`.
   ```
       @Override
       public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
           return internalSubscribe(option.getCnx(), 
option.getSubscriptionName(), option.getConsumerId(),
                   option.getSubType(), option.getPriorityLevel(), 
option.getConsumerName(), option.isDurable(),
                   option.getStartMessageId(), option.getMetadata(), 
option.isReadCompacted(),
                   option.getInitialPosition(), 
option.getStartMessageRollbackDurationSec(),
                   option.isReplicatedSubscriptionStateArg(), 
option.getKeySharedMeta(),
                   
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
                   option.getConsumerEpoch(), option.getSchemaType());
       }
   
       private CompletableFuture<Consumer> internalSubscribe(final TransportCnx 
cnx, String subscriptionName,
                                                             long consumerId, 
SubType subType, int priorityLevel,
                                                             String 
consumerName, boolean isDurable,
                                                             MessageId 
startMessageId,
                                                             Map<String, 
String> metadata, boolean readCompacted,
                                                             InitialPosition 
initialPosition,
                                                             long 
startMessageRollbackDurationSec,
                                                             boolean 
replicatedSubscriptionStateArg,
                                                             KeySharedMeta 
keySharedMeta,
                                                             Map<String, 
String> subscriptionProperties,
                                                             long consumerEpoch,
                                                             SchemaType 
schemaType) {
       ......
           CompletableFuture<Consumer> future = 
subscriptionFuture.thenCompose(subscription -> {
               Consumer consumer = new Consumer(subscription, subType, topic, 
consumerId, priorityLevel,
                       consumerName, isDurable, cnx, cnx.getAuthRole(), 
metadata,
                       readCompacted, keySharedMeta, startMessageId, 
consumerEpoch,
                       schemaType == null ? SchemaType.BYTES : schemaType);
       }
   
       @Override
       public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, 
String subscriptionName, long consumerId,
                                                    SubType subType, int 
priorityLevel, String consumerName,
                                                    boolean isDurable, 
MessageId startMessageId,
                                                    Map<String, String> 
metadata, boolean readCompacted,
                                                    InitialPosition 
initialPosition,
                                                    long 
startMessageRollbackDurationSec,
                                                    boolean 
replicatedSubscriptionStateArg,
                                                    KeySharedMeta keySharedMeta,
                                                    SchemaType schemaType) {
           return internalSubscribe(cnx, subscriptionName, consumerId, subType, 
priorityLevel, consumerName,
                   isDurable, startMessageId, metadata, readCompacted, 
initialPosition, startMessageRollbackDurationSec,
                   replicatedSubscriptionStateArg, keySharedMeta, null, 
DEFAULT_CONSUMER_EPOCH, schemaType);
       }
   ```
    The active consumers of the Topic have one or more consumers whose 
SchemaType is not AUTO_CONSUME. Then `checkSchemaCompatibleForConsumer`
   ```
       @Override
       public CompletableFuture<Void> 
addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
           return hasSchema().thenCompose((hasSchema) -> {
               int numActiveConsumersWithoutAutoSchema = 
subscriptions.values().stream()
                       .mapToInt(subscription -> 
subscription.getConsumers().stream()
                               .filter(consumer -> consumer.getSchemaType() != 
SchemaType.AUTO_CONSUME)
                               .toList().size())
                       .sum();
               if (hasSchema
                       || (!producers.isEmpty())
                       || (numActiveConsumersWithoutAutoSchema != 0)
                       || (ledger.getTotalSize() != 0)) {
                   return checkSchemaCompatibleForConsumer(schema);
               } else {
                   return addSchema(schema).thenCompose(schemaVersion ->
                           CompletableFuture.completedFuture(null));
               }
           });
       }
   ```
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   PR: #17449


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to