Copilot commented on code in PR #277:
URL: 
https://github.com/apache/pulsar-client-python/pull/277#discussion_r2643208770


##########
pulsar/asyncio.py:
##########
@@ -145,12 +403,238 @@ async def create_producer(self, topic: str) -> Producer:
         ------
         PulsarException
         """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
         future = asyncio.get_running_loop().create_future()
         conf = _pulsar.ProducerConfiguration()
-        # TODO: add more configs
-        self._client.create_producer_async(topic, conf, 
functools.partial(_set_future, future))
+        conf.send_timeout_millis(send_timeout_millis)
+        conf.compression_type(compression_type)
+        conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+        conf.block_if_queue_full(block_if_queue_full)
+        conf.batching_enabled(batching_enabled)
+        conf.batching_max_messages(batching_max_messages)
+        
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+        conf.partitions_routing_mode(message_routing_mode)
+        conf.batching_type(batching_type)
+        conf.chunking_enabled(chunking_enabled)
+        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+        conf.access_mode(access_mode)
+        if message_router is not None:
+            def underlying_router(msg, num_partitions):
+                return int(message_router(pulsar.Message._wrap(msg),
+                                          num_partitions))
+            conf.message_router(underlying_router)
+
+        if producer_name:
+            conf.producer_name(producer_name)
+        if initial_sequence_id is not None:
+            conf.initial_sequence_id(initial_sequence_id)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
+        conf.schema(schema.schema_info())
+        if encryption_key:
+            conf.encryption_key(encryption_key)
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+        if batching_enabled and chunking_enabled:
+            raise ValueError(
+                "Batching and chunking of messages can't be enabled together."
+            )
+
+        self._client.create_producer_async(
+            topic, conf, functools.partial(_set_future, future)
+        )
         return Producer(await future)
 
+    # pylint: 
disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments
+    async def subscribe(self, topic: Union[str, List[str]],
+                        subscription_name: str,
+                        consumer_type: pulsar.ConsumerType =
+                        pulsar.ConsumerType.Exclusive,
+                        schema: pulsar.schema.Schema | None = None,
+                        message_listener: Callable[['Consumer', 
pulsar.Message],
+                                                    None] | None = None,
+                        receiver_queue_size: int = 1000,
+                        max_total_receiver_queue_size_across_partitions: int =
+                        50000,
+                        consumer_name: str | None = None,
+                        unacked_messages_timeout_ms: int | None = None,
+                        broker_consumer_stats_cache_time_ms: int = 30000,
+                        negative_ack_redelivery_delay_ms: int = 60000,
+                        is_read_compacted: bool = False,
+                        properties: dict | None = None,
+                        initial_position: InitialPosition = 
InitialPosition.Latest,
+                        crypto_key_reader: pulsar.CryptoKeyReader | None = 
None,
+                        replicate_subscription_state_enabled: bool = False,
+                        max_pending_chunked_message: int = 10,
+                        auto_ack_oldest_chunked_message_on_queue_full: bool = 
False,
+                        start_message_id_inclusive: bool = False,
+                        batch_receive_policy: 
pulsar.ConsumerBatchReceivePolicy | None =
+                        None,
+                        key_shared_policy: pulsar.ConsumerKeySharedPolicy | 
None =
+                        None,
+                        batch_index_ack_enabled: bool = False,
+                        regex_subscription_mode: RegexSubscriptionMode =
+                        RegexSubscriptionMode.PersistentOnly,
+                        dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | 
None =
+                        None,
+                        crypto_failure_action: ConsumerCryptoFailureAction =
+                        ConsumerCryptoFailureAction.FAIL,
+                        is_pattern_topic: bool = False) -> Consumer:
+        """
+        Subscribe to the given topic and subscription combination.
+
+        Parameters
+        ----------
+        topic: str, List[str], or regex pattern
+            The name of the topic, list of topics or regex pattern.
+            When `is_pattern_topic` is True, `topic` is treated as a regex.
+        subscription_name: str
+            The name of the subscription.
+        consumer_type: pulsar.ConsumerType, 
default=pulsar.ConsumerType.Exclusive
+            Select the subscription type to be used when subscribing to the 
topic.
+        schema: pulsar.schema.Schema | None, default=None
+            Define the schema of the data that will be received by this 
consumer.
+        message_listener: Callable[[Consumer, pulsar.Message], None] | None, 
default=None
+            Sets a message listener for the consumer.
+        receiver_queue_size: int, default=1000
+            Sets the size of the consumer receive queue.
+        max_total_receiver_queue_size_across_partitions: int, default=50000
+            Set the max total receiver queue size across partitions.
+        consumer_name: str | None, default=None
+            Sets the consumer name.
+        unacked_messages_timeout_ms: int | None, default=None
+            Sets the timeout in milliseconds for unacknowledged messages.
+        broker_consumer_stats_cache_time_ms: int, default=30000
+            Sets the time duration for which the broker-side consumer stats
+            will be cached in the client.
+        negative_ack_redelivery_delay_ms: int, default=60000
+            The delay after which to redeliver the messages that failed to be
+            processed.
+        is_read_compacted: bool, default=False
+            Selects whether to read the compacted version of the topic.
+        properties: dict | None, default=None
+            Sets the properties for the consumer.
+        initial_position: InitialPosition, default=InitialPosition.Latest
+            Set the initial position of a consumer when subscribing to the 
topic.
+        crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
+            Symmetric encryption class implementation.
+        replicate_subscription_state_enabled: bool, default=False
+            Set whether the subscription status should be replicated.
+        max_pending_chunked_message: int, default=10
+            Consumer buffers chunk messages into memory until it receives all 
the chunks.
+        auto_ack_oldest_chunked_message_on_queue_full: bool, default=False
+            Automatically acknowledge oldest chunked messages on queue
+            full.
+        start_message_id_inclusive: bool, default=False
+            Set the consumer to include the given position of any reset
+            operation.
+        batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None, 
default=None
+            Set the batch collection policy for batch receiving.
+        key_shared_policy: pulsar.ConsumerKeySharedPolicy | None, default=None
+            Set the key shared policy for use when the ConsumerType is
+            KeyShared.
+        batch_index_ack_enabled: bool, default=False
+            Enable the batch index acknowledgement.
+        regex_subscription_mode: RegexSubscriptionMode,
+            default=RegexSubscriptionMode.PersistentOnly
+            Set the regex subscription mode for use when the topic is a regex
+            pattern.
+        dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None, 
default=None
+            Set dead letter policy for consumer.
+        crypto_failure_action: ConsumerCryptoFailureAction,
+            default=ConsumerCryptoFailureAction.FAIL
+            Set the behavior when the decryption fails.
+        is_pattern_topic: bool, default=False
+            Whether `topic` is a regex pattern. This option takes no effect
+            when `topic` is a list of topics.
+
+        Returns
+        -------
+        Consumer
+            The consumer created
+
+        Raises
+        ------
+        PulsarException
+        """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
+        future = asyncio.get_running_loop().create_future()
+        conf = _pulsar.ConsumerConfiguration()
+        conf.consumer_type(consumer_type)
+        conf.regex_subscription_mode(regex_subscription_mode)
+        conf.read_compacted(is_read_compacted)
+        if message_listener:
+            conf.message_listener(_listener_wrapper(message_listener, schema))

Review Comment:
   The message listener callback parameter type hint shows 'Consumer' (the 
asyncio Consumer class), but the wrapper creates a Consumer from a C extension 
consumer object. This could lead to confusion as the message listener callback 
in the asyncio context will run synchronously in a background thread (from the 
C++ callback), not in the async event loop. The Consumer object passed to the 
listener won't support await operations properly. This should either be 
documented or the feature should be redesigned to work properly with asyncio.



##########
pulsar/asyncio.py:
##########
@@ -116,6 +127,174 @@ async def close(self) -> None:
         self._producer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 
+class Consumer:
+    """
+    The Pulsar message consumer, used to subscribe to messages from a topic.
+    """
+
+    def __init__(self, consumer: _pulsar.Consumer) -> None:
+        """
+        Create the consumer.
+        Users should not call this constructor directly. Instead, create the
+        consumer via `Client.subscribe`.
+
+        Parameters
+        ----------
+        consumer: _pulsar.Consumer
+            The underlying Consumer object from the C extension.
+        """
+        self._consumer: _pulsar.Consumer = consumer
+
+    async def receive(self) -> pulsar.Message:
+        """
+        Receive a single message asynchronously.
+
+        Returns
+        -------
+        pulsar.Message
+            The message received.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.receive_async(functools.partial(_set_future, future))
+        msg = await future
+        m = pulsar.Message()
+        m._message = msg
+        m._schema = pulsar.schema.BytesSchema()

Review Comment:
   The receive method always sets the schema to BytesSchema(), ignoring the 
schema parameter passed to subscribe(). This means that if a user specifies a 
custom schema when subscribing, the received messages will still use 
BytesSchema() instead of the configured schema. The schema should be stored in 
the Consumer instance during initialization and used here instead of hardcoding 
BytesSchema().



##########
pulsar/asyncio.py:
##########
@@ -127,14 +306,93 @@ def __init__(self, service_url, **kwargs) -> None:
         """
         self._client: _pulsar.Client = pulsar.Client(service_url, 
**kwargs)._client
 
-    async def create_producer(self, topic: str) -> Producer:
+    # pylint: 
disable=too-many-arguments,too-many-locals,too-many-positional-arguments
+    async def create_producer(self, topic: str,
+                              producer_name: str | None = None,
+                              schema: pulsar.schema.Schema | None = None,
+                              initial_sequence_id: int | None = None,
+                              send_timeout_millis: int = 30000,
+                              compression_type: CompressionType = 
CompressionType.NONE,
+                              max_pending_messages: int = 1000,
+                              max_pending_messages_across_partitions: int = 
50000,
+                              block_if_queue_full: bool = False,
+                              batching_enabled: bool = True,
+                              batching_max_messages: int = 1000,
+                              batching_max_allowed_size_in_bytes: int = 
128*1024,
+                              batching_max_publish_delay_ms: int = 10,
+                              chunking_enabled: bool = False,
+                              message_routing_mode: PartitionsRoutingMode =
+                              PartitionsRoutingMode.RoundRobinDistribution,
+                              lazy_start_partitioned_producers: bool = False,
+                              properties: dict | None = None,
+                              batching_type: BatchingType = 
BatchingType.Default,
+                              encryption_key: str | None = None,
+                              crypto_key_reader: pulsar.CryptoKeyReader | None 
= None,
+                              access_mode: ProducerAccessMode = 
ProducerAccessMode.Shared,
+                              message_router: Callable[[pulsar.Message, int], 
int] | None = None,
+                              ) -> Producer:
         """
         Create a new producer on a given topic
 
         Parameters
         ----------
         topic: str
             The topic name
+        producer_name: str | None, default=None
+            Specify a name for the producer. If not assigned, the system will
+            generate a globally unique name which can be accessed with
+            `Producer.producer_name()`. When specifying a name, it is app to

Review Comment:
   The documentation states "When specifying a name, it is app to the user to 
ensure that..." which contains a grammatical error. This should be "it is up to 
the user to ensure that..."
   ```suggestion
               `Producer.producer_name()`. When specifying a name, it is up to
   ```



##########
pulsar/asyncio.py:
##########
@@ -127,14 +306,93 @@ def __init__(self, service_url, **kwargs) -> None:
         """
         self._client: _pulsar.Client = pulsar.Client(service_url, 
**kwargs)._client
 
-    async def create_producer(self, topic: str) -> Producer:
+    # pylint: 
disable=too-many-arguments,too-many-locals,too-many-positional-arguments
+    async def create_producer(self, topic: str,
+                              producer_name: str | None = None,
+                              schema: pulsar.schema.Schema | None = None,
+                              initial_sequence_id: int | None = None,
+                              send_timeout_millis: int = 30000,
+                              compression_type: CompressionType = 
CompressionType.NONE,
+                              max_pending_messages: int = 1000,
+                              max_pending_messages_across_partitions: int = 
50000,
+                              block_if_queue_full: bool = False,
+                              batching_enabled: bool = True,
+                              batching_max_messages: int = 1000,
+                              batching_max_allowed_size_in_bytes: int = 
128*1024,
+                              batching_max_publish_delay_ms: int = 10,
+                              chunking_enabled: bool = False,
+                              message_routing_mode: PartitionsRoutingMode =
+                              PartitionsRoutingMode.RoundRobinDistribution,
+                              lazy_start_partitioned_producers: bool = False,
+                              properties: dict | None = None,
+                              batching_type: BatchingType = 
BatchingType.Default,
+                              encryption_key: str | None = None,
+                              crypto_key_reader: pulsar.CryptoKeyReader | None 
= None,
+                              access_mode: ProducerAccessMode = 
ProducerAccessMode.Shared,
+                              message_router: Callable[[pulsar.Message, int], 
int] | None = None,
+                              ) -> Producer:
         """
         Create a new producer on a given topic
 
         Parameters
         ----------
         topic: str
             The topic name
+        producer_name: str | None, default=None
+            Specify a name for the producer. If not assigned, the system will
+            generate a globally unique name which can be accessed with
+            `Producer.producer_name()`. When specifying a name, it is app to
+            the user to ensure that, for a given topic, the producer name is
+            unique across all Pulsar's clusters.
+        schema: pulsar.schema.Schema | None, default=None
+            Define the schema of the data that will be published by this 
producer.
+        initial_sequence_id: int | None, default=None
+            Set the baseline for the sequence ids for messages published by
+            the producer.
+        send_timeout_millis: int, default=30000
+            If a message is not acknowledged by the server before the
+            send_timeout expires, an error will be reported.
+        compression_type: CompressionType, default=CompressionType.NONE
+            Set the compression type for the producer.
+        max_pending_messages: int, default=1000
+            Set the max size of the queue holding the messages pending to
+            receive an acknowledgment from the broker.
+        max_pending_messages_across_partitions: int, default=50000
+            Set the max size of the queue holding the messages pending to
+            receive an acknowledgment across partitions.
+        block_if_queue_full: bool, default=False
+            Set whether send operations should block when the outgoing
+            message queue is full.
+        batching_enabled: bool, default=True

Review Comment:
   The documentation states "batching_enabled: bool, default=False" which 
contradicts the actual default value of True in the function signature and the 
PR description which notes that "batching is enabled by default for async 
producer". The documentation should say "default=True" to match the 
implementation.
   ```suggestion
           batching_enabled: bool, default=True
   ```



##########
pulsar/asyncio.py:
##########
@@ -145,12 +403,238 @@ async def create_producer(self, topic: str) -> Producer:
         ------
         PulsarException
         """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
         future = asyncio.get_running_loop().create_future()
         conf = _pulsar.ProducerConfiguration()
-        # TODO: add more configs
-        self._client.create_producer_async(topic, conf, 
functools.partial(_set_future, future))
+        conf.send_timeout_millis(send_timeout_millis)
+        conf.compression_type(compression_type)
+        conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+        conf.block_if_queue_full(block_if_queue_full)
+        conf.batching_enabled(batching_enabled)
+        conf.batching_max_messages(batching_max_messages)
+        
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+        conf.partitions_routing_mode(message_routing_mode)
+        conf.batching_type(batching_type)
+        conf.chunking_enabled(chunking_enabled)
+        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+        conf.access_mode(access_mode)
+        if message_router is not None:
+            def underlying_router(msg, num_partitions):
+                return int(message_router(pulsar.Message._wrap(msg),
+                                          num_partitions))
+            conf.message_router(underlying_router)
+
+        if producer_name:
+            conf.producer_name(producer_name)
+        if initial_sequence_id is not None:
+            conf.initial_sequence_id(initial_sequence_id)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
+        conf.schema(schema.schema_info())
+        if encryption_key:
+            conf.encryption_key(encryption_key)
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+        if batching_enabled and chunking_enabled:
+            raise ValueError(
+                "Batching and chunking of messages can't be enabled together."
+            )
+
+        self._client.create_producer_async(
+            topic, conf, functools.partial(_set_future, future)
+        )
         return Producer(await future)
 
+    # pylint: 
disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments
+    async def subscribe(self, topic: Union[str, List[str]],
+                        subscription_name: str,
+                        consumer_type: pulsar.ConsumerType =
+                        pulsar.ConsumerType.Exclusive,
+                        schema: pulsar.schema.Schema | None = None,
+                        message_listener: Callable[['Consumer', 
pulsar.Message],
+                                                    None] | None = None,
+                        receiver_queue_size: int = 1000,
+                        max_total_receiver_queue_size_across_partitions: int =
+                        50000,
+                        consumer_name: str | None = None,
+                        unacked_messages_timeout_ms: int | None = None,
+                        broker_consumer_stats_cache_time_ms: int = 30000,
+                        negative_ack_redelivery_delay_ms: int = 60000,
+                        is_read_compacted: bool = False,
+                        properties: dict | None = None,
+                        initial_position: InitialPosition = 
InitialPosition.Latest,
+                        crypto_key_reader: pulsar.CryptoKeyReader | None = 
None,
+                        replicate_subscription_state_enabled: bool = False,
+                        max_pending_chunked_message: int = 10,
+                        auto_ack_oldest_chunked_message_on_queue_full: bool = 
False,
+                        start_message_id_inclusive: bool = False,
+                        batch_receive_policy: 
pulsar.ConsumerBatchReceivePolicy | None =
+                        None,
+                        key_shared_policy: pulsar.ConsumerKeySharedPolicy | 
None =
+                        None,
+                        batch_index_ack_enabled: bool = False,
+                        regex_subscription_mode: RegexSubscriptionMode =
+                        RegexSubscriptionMode.PersistentOnly,
+                        dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | 
None =
+                        None,
+                        crypto_failure_action: ConsumerCryptoFailureAction =
+                        ConsumerCryptoFailureAction.FAIL,
+                        is_pattern_topic: bool = False) -> Consumer:
+        """
+        Subscribe to the given topic and subscription combination.
+
+        Parameters
+        ----------
+        topic: str, List[str], or regex pattern
+            The name of the topic, list of topics or regex pattern.
+            When `is_pattern_topic` is True, `topic` is treated as a regex.
+        subscription_name: str
+            The name of the subscription.
+        consumer_type: pulsar.ConsumerType, 
default=pulsar.ConsumerType.Exclusive
+            Select the subscription type to be used when subscribing to the 
topic.
+        schema: pulsar.schema.Schema | None, default=None
+            Define the schema of the data that will be received by this 
consumer.
+        message_listener: Callable[[Consumer, pulsar.Message], None] | None, 
default=None
+            Sets a message listener for the consumer.
+        receiver_queue_size: int, default=1000
+            Sets the size of the consumer receive queue.
+        max_total_receiver_queue_size_across_partitions: int, default=50000
+            Set the max total receiver queue size across partitions.
+        consumer_name: str | None, default=None
+            Sets the consumer name.
+        unacked_messages_timeout_ms: int | None, default=None
+            Sets the timeout in milliseconds for unacknowledged messages.
+        broker_consumer_stats_cache_time_ms: int, default=30000
+            Sets the time duration for which the broker-side consumer stats
+            will be cached in the client.
+        negative_ack_redelivery_delay_ms: int, default=60000
+            The delay after which to redeliver the messages that failed to be
+            processed.
+        is_read_compacted: bool, default=False
+            Selects whether to read the compacted version of the topic.
+        properties: dict | None, default=None
+            Sets the properties for the consumer.
+        initial_position: InitialPosition, default=InitialPosition.Latest
+            Set the initial position of a consumer when subscribing to the 
topic.
+        crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
+            Symmetric encryption class implementation.
+        replicate_subscription_state_enabled: bool, default=False
+            Set whether the subscription status should be replicated.
+        max_pending_chunked_message: int, default=10
+            Consumer buffers chunk messages into memory until it receives all 
the chunks.
+        auto_ack_oldest_chunked_message_on_queue_full: bool, default=False
+            Automatically acknowledge oldest chunked messages on queue
+            full.
+        start_message_id_inclusive: bool, default=False
+            Set the consumer to include the given position of any reset
+            operation.
+        batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None, 
default=None
+            Set the batch collection policy for batch receiving.
+        key_shared_policy: pulsar.ConsumerKeySharedPolicy | None, default=None
+            Set the key shared policy for use when the ConsumerType is
+            KeyShared.
+        batch_index_ack_enabled: bool, default=False
+            Enable the batch index acknowledgement.
+        regex_subscription_mode: RegexSubscriptionMode,
+            default=RegexSubscriptionMode.PersistentOnly
+            Set the regex subscription mode for use when the topic is a regex
+            pattern.
+        dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None, 
default=None
+            Set dead letter policy for consumer.
+        crypto_failure_action: ConsumerCryptoFailureAction,
+            default=ConsumerCryptoFailureAction.FAIL
+            Set the behavior when the decryption fails.
+        is_pattern_topic: bool, default=False
+            Whether `topic` is a regex pattern. This option takes no effect
+            when `topic` is a list of topics.
+
+        Returns
+        -------
+        Consumer
+            The consumer created
+
+        Raises
+        ------
+        PulsarException
+        """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
+        future = asyncio.get_running_loop().create_future()
+        conf = _pulsar.ConsumerConfiguration()
+        conf.consumer_type(consumer_type)
+        conf.regex_subscription_mode(regex_subscription_mode)
+        conf.read_compacted(is_read_compacted)
+        if message_listener:
+            conf.message_listener(_listener_wrapper(message_listener, schema))
+        conf.receiver_queue_size(receiver_queue_size)
+        conf.max_total_receiver_queue_size_across_partitions(
+            max_total_receiver_queue_size_across_partitions
+        )
+        if consumer_name:
+            conf.consumer_name(consumer_name)
+        if unacked_messages_timeout_ms:
+            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
+
+        conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
+        
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+        conf.subscription_initial_position(initial_position)
+
+        conf.schema(schema.schema_info())
+
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+        
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
+        conf.max_pending_chunked_message(max_pending_chunked_message)
+        conf.auto_ack_oldest_chunked_message_on_queue_full(
+            auto_ack_oldest_chunked_message_on_queue_full
+        )
+        conf.start_message_id_inclusive(start_message_id_inclusive)
+        if batch_receive_policy:
+            conf.batch_receive_policy(batch_receive_policy.policy())
+
+        if key_shared_policy:
+            conf.key_shared_policy(key_shared_policy.policy())
+        conf.batch_index_ack_enabled(batch_index_ack_enabled)
+        if dead_letter_policy:
+            conf.dead_letter_policy(dead_letter_policy.policy())
+        conf.crypto_failure_action(crypto_failure_action)
+
+        if isinstance(topic, str):
+            if is_pattern_topic:
+                self._client.subscribe_async_pattern(
+                    topic, subscription_name, conf,
+                    functools.partial(_set_future, future)
+                )
+            else:
+                self._client.subscribe_async(
+                    topic, subscription_name, conf,
+                    functools.partial(_set_future, future)
+                )
+        elif isinstance(topic, list):

Review Comment:
   When is_pattern_topic=True and topic is a list, the is_pattern_topic flag is 
silently ignored as documented in line 555-556. However, this could lead to 
confusion since the code doesn't warn or raise an error when an invalid 
combination is provided. Consider adding validation to raise a ValueError when 
is_pattern_topic=True and topic is a list, since pattern matching doesn't make 
sense with explicit topic lists.
   ```suggestion
           elif isinstance(topic, list):
               if is_pattern_topic:
                   raise ValueError(
                       "Argument 'topic' must be a string when "
                       "'is_pattern_topic' is True; lists of topics do not "
                       "support pattern subscriptions"
                   )
   ```



##########
pulsar/asyncio.py:
##########
@@ -170,3 +656,12 @@ def complete():
         else:
             future.set_exception(PulsarException(result))
     future.get_loop().call_soon_threadsafe(complete)
+
+def _listener_wrapper(listener, schema):
+    def wrapper(consumer, msg):
+        c = Consumer(consumer)
+        m = pulsar.Message()
+        m._message = msg
+        m._schema = schema
+        listener(c, m)
+    return wrapper

Review Comment:
   The _listener_wrapper function correctly accepts a schema parameter and uses 
it when wrapping messages, but unlike Consumer.receive(), it properly preserves 
the schema. However, the schema parameter should be documented in the function, 
and there should be consistency with how Consumer stores and uses the schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to