RongtongJin commented on code in PR #1256:
URL: https://github.com/apache/rocketmq-clients/pull/1256#discussion_r3497121187


##########
.github/workflows/php_build.yml:
##########
@@ -17,9 +17,20 @@ jobs:
         uses: shivammathur/setup-php@v2
         with:
           php-version: ${{ matrix.php-version }}
+          extensions: grpc, protobuf
       - name: Validate composer.json
         working-directory: ./php
         run: composer validate
       - name: Install Dependencies
         working-directory: ./php
-        run: composer install
+        run: composer install --no-interaction --prefer-dist
+      - name: Run PHPUnit Tests
+        if: runner.os != 'Windows'
+        working-directory: ./php
+        run: vendor/bin/phpunit --testsuite "RocketMQ PHP Test Suite" 
--no-coverage
+      - name: Run PHPUnit Tests (Windows)
+        if: runner.os == 'Windows'
+        working-directory: ./php
+        # Workaround: gRPC C extension may cause non-zero exit during PHP 
shutdown on
+        # Windows even when all tests pass. Force exit 0 to prevent false CI 
failures.
+        run: vendor/bin/phpunit --testsuite "RocketMQ PHP Test Suite" 
--no-coverage; exit 0

Review Comment:
   This makes the Windows leg unable to fail: any PHPUnit failure is converted 
to success. If the gRPC extension has a known shutdown-only issue, please gate 
that workaround narrowly, for example by detecting that specific shutdown 
failure, or isolate/skip the affected tests, instead of appending `exit 0` to 
the whole test command.



##########
php/TransactionTrait.php:
##########
@@ -0,0 +1,283 @@
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+use Apache\Rocketmq\V2\EndTransactionRequest;
+use Apache\Rocketmq\V2\Endpoints;
+use Apache\Rocketmq\V2\TransactionResolution;
+use Apache\Rocketmq\V2\TransactionSource;
+use Apache\Rocketmq\V2\Resource;
+use Apache\Rocketmq\V2\Message;
+
+/**
+ * TransactionTrait — Transaction (half-message) send, commit, rollback, and 
orphaned recovery.
+ *
+ * Extracted from Producer. The using class must provide:
+ * - Properties: $isRunning, $validator (MessageValidator, protected)
+ * - ClientTrait methods: buildMetadata(), getOperationTimeout()
+ * - Send delegation: validateMessage(), detectMessageType(),
+ *   wrapTransactionMessageRequest(), sendMessageWithRetry()
+ * - Route delegation: getPublishingLoadBalancer(), getIsolatedBrokerNames()
+ * - Infrastructure: getClientForRpc(), getTelemetrySession(), getLogger(),
+ *   getSettingsMaxAttempts(), getSettingsTlsCredentials(), 
isSettingsSslEnabled()
+ * - Interceptors: executeInterceptors()
+ */
+trait TransactionTrait
+{
+    private ?TransactionChecker $transactionChecker = null;
+    private ?LocalTransactionExecuter $localTransactionExecuter = null;
+
+    /**
+     * Set transaction checker for orphaned transaction recovery.
+     */
+    public function setTransactionChecker(TransactionChecker $checker): self
+    {
+        $this->transactionChecker = $checker;
+        return $this;
+    }
+
+    /**
+     * Set local transaction executer for auto commit/rollback of 
half-messages.
+     */
+    public function setLocalTransactionExecuter(LocalTransactionExecuter 
$executer): self
+    {
+        $this->localTransactionExecuter = $executer;
+        return $this;
+    }
+
+    /**
+     * Send a transaction message (half-message + local transaction + 
commit/rollback).
+     */
+    public function sendWithTransaction(Message $message, Transaction 
$transaction, ?LocalTransactionExecuter $executor = null): array
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+
+        $this->validateMessage($message);
+
+        $sysProps = $message->getSystemProperties();
+        $hasMessageGroup = $sysProps && $sysProps->hasMessageGroup();
+        $hasLiteTopic = $sysProps && $sysProps->hasLiteTopic();
+        $hasDeliveryTimestamp = $sysProps && $sysProps->hasDeliveryTimestamp();
+        $hasPriority = $sysProps && $sysProps->hasPriority();
+
+        if ($hasMessageGroup || $hasLiteTopic || $hasDeliveryTimestamp || 
$hasPriority) {
+            throw new \InvalidArgumentException(
+                "Transactional message should not set messageGroup, 
deliveryTimestamp, liteTopic, or priority"
+            );
+        }
+
+        $topic = $message->getTopic()->getName();
+        $loadBalancer = $this->getPublishingLoadBalancer($topic);
+        $messageQueue = 
$loadBalancer->takeMessageQueue($this->getIsolatedBrokerNames(), 
$this->getSettingsMaxAttempts());
+
+        if (empty($messageQueue)) {
+            throw new \RuntimeException("No available message queue for topic: 
{$topic}");
+        }
+
+        if ($this->validator->isValidateMessageType()) {
+            $msgType = $this->detectMessageType($message, true);
+            $loadBalancer->validateMessageTypeAgainstQueue($messageQueue[0], 
$msgType, $topic);
+        }
+
+        $request = $this->wrapTransactionMessageRequest([$message], 
$messageQueue[0]);
+        $result = $this->sendMessageWithRetry($request, $message, 
$messageQueue, $this->getSettingsMaxAttempts());
+
+        if (isset($result['transactionId'])) {
+            $transaction->tryAddMessage($message);
+            $transaction->tryAddReceipt($message, $result, 
PublishingRouteManager::extractMessageQueueEndpoint($messageQueue[0]));
+        }
+
+        if ($executor !== null) {

Review Comment:
   The builder-level `LocalTransactionExecuter` is never used here; only the 
per-call `$executor` is checked. A producer configured through 
`ProducerBuilder::setLocalTransactionExecuter()` will send the half message but 
never execute/commit/rollback the local transaction unless the caller passes 
another executor to this method. Please use `$executor ?? 
$this->localTransactionExecuter`.



##########
php/MessageValidator.php:
##########
@@ -0,0 +1,128 @@
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+use Apache\Rocketmq\V2\Message;
+use Apache\Rocketmq\V2\MessageType as V2MessageType;
+
+/**
+ * MessageValidator - Message validation and type detection.
+ *
+ * Extracted from Producer to separate validation concerns:
+ * 1. validateMessage() - validates topic, body, and body size
+ * 2. detectMessageType() - determines 
NORMAL/FIFO/DELAY/PRIORITY/TRANSACTION/LITE
+ * 3. Configurable max body size and message type validation (updated by 
server settings)
+ */
+class MessageValidator
+{
+    private int $maxBodySizeBytes;
+    private bool $validateMessageType;
+
+    public function __construct(int $maxBodySizeBytes = 4194304, bool 
$validateMessageType = true)
+    {
+        $this->maxBodySizeBytes = $maxBodySizeBytes;
+        $this->validateMessageType = $validateMessageType;
+    }
+
+    /**
+     * Validate a message before sending.
+     *
+     * @param Message $message The message to validate
+     * @throws \InvalidArgumentException If validation fails
+     */
+    public function validateMessage(Message $message): void
+    {
+        if (!$message->hasTopic() || 
empty(trim($message->getTopic()->getName()))) {
+            throw new \InvalidArgumentException("Message topic is required");
+        }
+        if (empty($message->getBody())) {

Review Comment:
   `empty()` rejects `"0"`, which is a valid non-empty message body in PHP. 
This validator is stricter than `MessageBuilder`'s null-only body check. Please 
use a strict empty-string check, such as `$body === ''` or `strlen($body) === 
0`.



##########
php/TelemetrySession.php:
##########
@@ -0,0 +1,949 @@
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+
+use Apache\Rocketmq\V2\MessagingServiceClient;
+use Apache\Rocketmq\V2\TelemetryCommand;
+use Apache\Rocketmq\V2\Settings;
+use Exception;
+use Grpc\ChannelCredentials;
+
+/**
+ * TelemetrySession - Telemetry Session (full implementation referencing Java 
ClientSessionImpl)
+ *
+ * Core features:
+ * 1. Singleton pattern (same Endpoints share Session)
+ * 2. Settings sync confirmation mechanism
+ * 3. Bidirectional stream management
+ * 4. Command dispatch processing
+ * 5. Automatic reconnection mechanism
+ * 6. Swoole coroutine background reader for server-pushed commands
+ */
+class TelemetrySession
+{
+    private static array $instances = [];
+    private static array $instanceTimestamps = [];
+    private const MAX_INSTANCES = 10;
+    /** Session TTL in seconds; sessions older than this are evicted even if 
stream appears alive. */
+    private const SESSION_TTL_SECONDS = 1800; // 30 minutes
+    private object $client;
+    private string $endpoints;
+    /** @var object|null gRPC stream */
+    private $stream;
+    private Logger $logger;
+    private string $clientId;
+
+    // Settings sync state
+    private bool $settingsSynced = false;
+    private ?string $settingsError = null;
+    /**
+     * Maximum number of consecutive errors before giving up.
+     */
+    private const MAX_CONSECUTIVE_ERRORS = 10;
+    /**
+     * Read timeout in seconds.
+     */
+    private const READ_TIMEOUT_SECONDS = 30.0;
+    private float $settingsTimeout = 3.0; // seconds, matching Java's 
SETTINGS_INITIALIZATION_TIMEOUT
+
+    // Credentials for AK/SK signing
+    private ?SessionCredentials $credentials = null;
+
+    // Namespace for resource scoping
+    private string $namespace = '';
+
+    // Settings received from server
+    private ?object $serverSettings = null;
+
+    // Settings change callback
+    /** @var callable|null */
+    private $onSettingsChange = null;
+
+    // Server command callbacks
+    /** @var callable|null */
+    private $onRecoverOrphanedTransaction = null;
+    /** @var callable|null */
+    private $onVerifyMessage = null;
+    /** @var callable|null */
+    private $onPrintThreadStackTrace = null;
+    /** @var callable|null */
+    private $onReconnectEndpoints = null;
+    /** @var callable|null */
+    private $onNotifyUnsubscribeLite = null;
+
+    // Swoole coroutine reader state
+    private int $swooleCoroutineId = -1;
+    private bool $isClosing = false;
+    private bool $isReconnecting = false;
+    private ?object $lastSettingsCommand = null;
+
+    /**
+     * Initialize telemetry session with client and connection details.
+     *
+     * @param object $client gRPC messaging service client
+     * @param string $endpoints Server endpoints
+     * @param string|null $clientId Client identifier
+     * @param SessionCredentials|null $credentials Session credentials for 
signing
+     * @param string $namespace Resource namespace
+     */
+    private function __construct(object $client, string $endpoints, ?string 
$clientId = null, ?SessionCredentials $credentials = null, string $namespace = 
'')
+    {
+        $this->client = $client;
+        $this->endpoints = $endpoints;
+        $this->credentials = $credentials;
+        $this->namespace = $namespace;
+        $this->logger = Logger::getInstance('TelemetrySession');
+        if ($clientId) {
+            $this->clientId = $clientId;
+        }
+    }
+
+    /**
+     * Register callback for server settings changes.
+     *
+     * @param callable $callback Callback receiving server Settings
+     * @return void
+     */
+    public function setOnSettingsChange(callable $callback): void
+    {
+        $this->onSettingsChange = $callback;
+    }
+
+    /**
+     * Register callback for orphaned transaction recovery.
+     *
+     * @param callable $callback Callback receiving 
RecoverOrphanedTransactionCommand
+     * @return void
+     */
+    public function setOnRecoverOrphanedTransaction(callable $callback): void
+    {
+        $this->onRecoverOrphanedTransaction = $callback;
+    }
+
+    /**
+     * Register callback for message verification.
+     *
+     * @param callable $callback Callback receiving VerifyMessageCommand
+     * @return void
+     */
+    public function setOnVerifyMessage(callable $callback): void
+    {
+        $this->onVerifyMessage = $callback;
+    }
+
+    /**
+     * Register callback for printing thread stack trace.
+     *
+     * @param callable $callback Callback receiving 
PrintThreadStackTraceCommand
+     * @return void
+     */
+    public function setOnPrintThreadStackTrace(callable $callback): void
+    {
+        $this->onPrintThreadStackTrace = $callback;
+    }
+
+    /**
+     * Register callback for endpoint reconnection.
+     *
+     * @param callable $callback Callback receiving ReconnectEndpointsCommand
+     * @return void
+     */
+    public function setOnReconnectEndpoints(callable $callback): void
+    {
+        $this->onReconnectEndpoints = $callback;
+    }
+
+    /**
+     * Register callback for unsubscribe notification.
+     *
+     * @param callable $callback Callback receiving 
NotifyUnsubscribeLiteCommand
+     * @return void
+     */
+    public function setOnNotifyUnsubscribeLite(callable $callback): void
+    {
+        $this->onNotifyUnsubscribeLite = $callback;
+    }
+
+    /**
+     * Get the current server settings.
+     *
+     * @return object|null Server settings object or null
+     */
+    public function getServerSettings()
+    {
+        return $this->serverSettings;
+    }
+
+    /**
+     * Reset all session instances (mainly for testing).
+     *
+     * @return void
+     */
+    public static function resetAll(): void
+    {
+        self::$instances = [];
+        self::$instanceTimestamps = [];
+    }
+
+    /**
+     * Get or create a session instance for the given endpoints.
+     *
+     * @param object $client gRPC messaging service client
+     * @param string $endpoints Server endpoints
+     * @param string|null $clientId Client identifier
+     * @param SessionCredentials|null $credentials Session credentials
+     * @param string $namespace Resource namespace
+     * @return self
+     */
+    public static function getInstance(object $client, string $endpoints, 
?string $clientId = null, ?SessionCredentials $credentials = null, string 
$namespace = ''): self
+    {
+        $credId = $credentials !== null ? spl_object_id($credentials) : 'none';
+        $effectiveClientId = $clientId ?? 'none';
+        $key = $endpoints . '|' . $credId . '|' . $namespace . '|' . 
$effectiveClientId;
+
+        if (isset(self::$instances[$key])) {
+            $existing = self::$instances[$key];
+            $age = time() - (self::$instanceTimestamps[$key] ?? 0);
+
+            if (!$existing->isAlive() || $age > self::SESSION_TTL_SECONDS) {
+                $reason = !$existing->isAlive() ? 'dead stream' : "TTL expired 
({$age}s > " . self::SESSION_TTL_SECONDS . "s)";
+                Logger::getInstance('TelemetrySession')->info("Evicting stale 
session for endpoints: {$endpoints}, reason: {$reason}");
+                $existing->close();
+                unset(self::$instances[$key]);
+                unset(self::$instanceTimestamps[$key]);
+            }
+        }
+
+        if (!isset(self::$instances[$key])) {
+            if (count(self::$instances) >= self::MAX_INSTANCES) {
+                self::evictOldest();
+            }
+            Logger::getInstance('TelemetrySession')->info("Creating new 
session for endpoints: {$endpoints}, clientId: {$effectiveClientId}");
+            $instance = new self($client, $endpoints, $clientId, $credentials, 
$namespace);
+            self::$instances[$key] = $instance;
+            self::$instanceTimestamps[$key] = time();
+        }
+
+        return self::$instances[$key];
+    }
+
+    /**
+     * Check if this session is still alive.
+     *
+     * Health check hierarchy:
+     *   1. isClosing flag → immediately stale
+     *   2. Stream was created but is now closed → stale
+     *   3. Stream exists but write probe fails → stale
+     *   4. Session never started (no stream yet) → considered alive (pending 
start)
+     *
+     * @return bool
+     */
+    private function isAlive(): bool
+    {
+        if ($this->isClosing) {
+            return false;
+        }
+
+        if ($this->stream !== null) {
+            if ($this->isStreamClosed()) {
+                return false;
+            }
+            // Probe write capability: if write fails, the stream is stale
+            if (!$this->probeStreamWritable()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Non-destructive probe to check if the stream is still writable.
+     * Sends a zero-length write which gRPC treats as a keepalive check.
+     *
+     * @return bool true if stream accepts writes
+     */
+    private function probeStreamWritable(): bool
+    {
+        if ($this->stream === null) {
+            return false;
+        }
+        try {
+            // Attempt flush as a lightweight connectivity probe.
+            // gRPC flush will throw if the underlying channel is broken.
+            $this->stream->flush();
+            return true;
+        } catch (\Throwable $e) {
+            $this->logger->debug("Stream write probe failed: " . 
$e->getMessage());
+            return false;
+        }
+    }
+
+    /**
+     * Check if the underlying stream is closed.
+     *
+     * @return bool
+     */
+    private function isStreamClosed(): bool
+    {
+        if ($this->stream === null) {
+            return true;
+        }
+        try {
+            $status = $this->stream->getStatus();
+            $code = is_object($status) ? ($status->code ?? -1) : 
(is_array($status) ? ($status['code'] ?? -1) : -1);
+            if ($code !== 0) {
+                return true;
+            }
+        } catch (\Exception $e) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Evict the oldest instance to make room for a new one.
+     * @return void
+     */
+    private static function evictOldest(): void
+    {
+        $oldestKey = null;
+        $oldestTime = PHP_INT_MAX;
+        foreach (self::$instanceTimestamps as $key => $timestamp) {
+            if ($timestamp < $oldestTime) {
+                $oldestTime = $timestamp;
+                $oldestKey = $key;
+            }
+        }
+
+        if ($oldestKey !== null) {
+            Logger::getInstance('TelemetrySession')->info("Evicting oldest 
session (max instance reached): {$oldestKey}");
+            if (isset(self::$instances[$oldestKey])) {
+                self::$instances[$oldestKey]->close();
+            }
+            unset(self::$instances[$oldestKey]);
+            unset(self::$instanceTimestamps[$oldestKey]);
+        }
+    }
+
+    /**
+     * Synchronize settings with broker via telemetry stream.
+     *
+     * @param object $settingsCommand Telemetry command containing settings
+     * @return bool True if settings were successfully synced
+     */
+    public function syncSettings($settingsCommand)
+    {
+        $this->lastSettingsCommand = $settingsCommand;
+        $this->isClosing = false;
+        
+        // Create stream and send settings
+        $success = $this->createStreamAndSync($settingsCommand);
+        if (!$success) {
+            return false;
+        }
+        
+        // Wait for settings confirmation with timeout
+        return $this->waitForSettingsConfirmation();
+    }
+
+    /**
+     * Wait for settings confirmation from broker with timeout.
+     * In Swoole mode, the background reader will set settingsSynced when 
SETTINGS is received.
+     * In non-Swoole mode, we poll manually with exponential backoff.
+     *
+     * @return bool True if settings confirmed before timeout
+     */
+    private function waitForSettingsConfirmation(): bool
+    {
+        $startTime = microtime(true);
+        $pollIntervalUs = 10000;
+        $maxPollIntervalUs = 200000;
+        
+        while (microtime(true) - $startTime < $this->settingsTimeout) {
+            if ($this->settingsSynced) {
+                $elapsed = round(microtime(true) - $startTime, 2);
+                $this->logger->info("Settings confirmed by broker after 
{$elapsed}s");
+                return true;
+            }
+            
+            if ($this->settingsError !== null) {
+                $this->logger->warning("Settings sync issue (non-fatal): " . 
$this->settingsError);
+                return true;

Review Comment:
   A settings stream error is treated as success here. `Producer::start()` and 
the consumer startup paths rely on `syncSettings()` to decide whether the 
client is ready, so this can mark the client running without server-accepted 
settings such as backoff, message type, assignment, or transaction callbacks. 
Please return failure/throw on stream error, and apply the same rule to the 
timeout path, unless every operation is guarded until settings are actually 
confirmed.



##########
php/LitePushConsumer.php:
##########
@@ -0,0 +1,334 @@
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+use Apache\Rocketmq\V2\SyncLiteSubscriptionRequest;
+use Apache\Rocketmq\V2\LiteSubscriptionAction;
+use Apache\Rocketmq\V2\Resource;
+use Apache\Rocketmq\V2\ClientType;
+
+/**
+ * LitePushConsumer - Push consumer for lite topics.
+ *
+ * Extends PushConsumer with dynamic lite topic subscription management.
+ * Instead of creating many physical topics, Lite consumers use a parent topic
+ * with logical lite topic sub-classifiers.
+ *
+ * Usage:
+ *   $consumer = new LitePushConsumer($endpoints, $consumerGroup, 
$parentTopic);
+ *   $consumer->subscribeLite('lite-topic-1', $callback);
+ *   $consumer->subscribeLite('lite-topic-2', $callback);
+ *   $consumer->start();
+ */
+class LitePushConsumer extends PushConsumer
+{
+    private readonly string $parentTopic;
+    private array $liteTopics = [];
+    private readonly int $liteSubscriptionQuota;
+    private readonly int $maxLiteTopicSize;
+    private int $syncLiteSubscriptionInterval = 30;
+    /** @var callable|null per-lite-topic callback */
+    private ?\Closure $liteMessageListener = null;
+    private int $lastSyncTime = 0;
+    private ?ProcessQueue $virtualProcessQueue = null; // ProcessQueue|null
+
+    /**
+     * Constructor.
+     *
+     * @param string $endpoints gRPC server endpoint
+     * @param string $consumerGroup Consumer group name
+     * @param string $parentTopic Parent (bound) topic
+     * @param array $options Configuration options
+     *  - clientId: string, custom client identifier (default: 
'php-push-consumer-{pid}-{time}')
+     *  - messageListener: callable|null, message consumption callback
+     *  - maxCacheMessageCount: int, max cached messages in memory (default: 
4096)
+     *  - maxCacheMessageSizeInBytes: int, max cached message total size 
(default: 67108864, 64MB)
+     *  - awaitDuration: int, long polling timeout in seconds (default: 5)
+     *  - scanIntervalSeconds: int, assignment scan interval in seconds 
(default: 5)
+     *  - receiveBatchSize: int, max messages per receive batch (default: 32)
+     *  - enableFifoConsumeAccelerator: bool, enable FIFO consume accelerator 
(default: true for Lite)
+     *  - credentials: SessionCredentials|null, AK/SK authentication 
credentials
+     *  - namespace: string, resource namespace prefix (default: '')
+     *  - tlsCredentials: TlsCredentials|null, TLS/SSL configuration
+     *  - sslEnabled: bool, enable SSL for gRPC channel (default: true)
+     *  - liteSubscriptionQuota: int, max number of lite topic subscriptions 
(default: 0 = unlimited)
+     *  - maxLiteTopicSize: int, max length of lite topic name (default: 64)
+     */
+    public function __construct(string $endpoints, string $consumerGroup, 
$parentTopic, array $options = [])
+    {
+        if (empty(trim($parentTopic))) {
+            throw new \InvalidArgumentException("LitePushConsumer parentTopic 
cannot be empty");
+        }
+        $this->parentTopic = $parentTopic;
+        $listener = $options['messageListener'] ?? null;
+        $this->liteMessageListener = $listener !== null
+            ? ($listener instanceof \Closure ? $listener : 
\Closure::fromCallable($listener))
+            : null;
+
+        $liteOptions = array_merge($options, [
+            'subscriptionExpressions' => [$parentTopic => '*'],
+            'fifo' => true,
+            'isLiteConsumer' => true,
+            'enableFifoConsumeAccelerator' => 
$options['enableFifoConsumeAccelerator'] ?? true,
+            'messageListener' => $this->liteMessageListener,
+        ]);
+
+        parent::__construct($endpoints, $consumerGroup, $liteOptions);
+
+        $this->liteSubscriptionQuota = $options['liteSubscriptionQuota'] ?? 0;
+        $this->maxLiteTopicSize = $options['maxLiteTopicSize'] ?? 64;
+    }
+
+    /**
+     * Subscribe to a lite topic.
+     *
+     * @param string $liteTopic Lite topic name
+     * @param callable|null $listener Optional per-lite-topic callback
+     * @return $this
+     */
+    public function subscribeLite(string $liteTopic, ?callable $listener = 
null): self
+    {
+        $this->checkNotRunning();
+
+        if (strlen($liteTopic) > $this->maxLiteTopicSize) {
+            throw new \RuntimeException("Lite topic name exceeds max length of 
{$this->maxLiteTopicSize}");
+        }
+
+        if ($this->liteSubscriptionQuota > 0 && count($this->liteTopics) >= 
$this->liteSubscriptionQuota) {
+            throw new \RuntimeException("Lite subscription quota exceeded: 
{$this->liteSubscriptionQuota}");
+        }
+
+        $this->liteTopics[$liteTopic] = $listener;
+
+        return $this;
+    }
+
+    /**
+     * Get the client type for this consumer.
+     *
+     * @return int ClientType::LITE_PUSH_CONSUMER
+     */
+    protected function getClientType(): int
+    {
+        return ClientType::LITE_PUSH_CONSUMER;
+    }
+
+    /**
+     * Unsubscribe from a lite topic.
+     *
+     * @param string $liteTopic Lite topic name to remove
+     * @return $this
+     */
+    public function unsubscribeLite(string $liteTopic): self
+    {
+        $this->checkNotRunning();
+        unset($this->liteTopics[$liteTopic]);
+        return $this;
+    }
+
+    /**
+     * Get subscribed lite topics.
+     *
+     * @return array
+     */
+    public function getLiteTopics(): array
+    {
+        return array_keys($this->liteTopics);
+    }
+
+    /**
+     * Set the global lite message listener (used when no per-lite-topic 
listener is set).
+     *
+     * @param callable $listener Callback invoked for messages with no 
per-lite-topic listener
+     * @return $this
+     */
+    public function setLiteMessageListener(callable $listener): self
+    {
+        $this->liteMessageListener = $listener instanceof \Closure
+            ? $listener
+            : \Closure::fromCallable($listener);
+        return $this;
+    }
+
+    /**
+     * Start the LitePushConsumer.
+     *
+     * Overrides parent start() to sync lite subscriptions, register handlers, 
and use lite-aware consume service.
+     */
+    public function start(): void
+    {
+        if ($this->isRunning()) {
+            return;
+        }
+
+        if (empty($this->liteTopics)) {
+            throw new \RuntimeException("LitePushConsumer has no lite topics 
subscribed");
+        }
+
+        if ($this->liteMessageListener === null) {
+            throw new \RuntimeException("LitePushConsumer has no lite message 
listener");
+        }
+
+        $this->logger->info("LitePushConsumer starting, 
clientId={$this->getClientId()}, parentTopic={$this->parentTopic}");
+        parent::start();
+    }
+
+    /**
+     * Setup before the main scan loop: sync lite subscriptions and wait for 
assignments.
+     */
+    protected function onStartBeforeLoop(): void
+    {
+        $self = $this;
+        $this->telemetrySession->setOnNotifyUnsubscribeLite(function 
($notifyCmd) use ($self) {
+            $liteTopic = $notifyCmd->getLiteTopic();
+            $self->logger->info("Received NotifyUnsubscribeLite for 
liteTopic={$liteTopic}");
+            $self->handleUnsubscribeLite($liteTopic);
+        });
+        $this->syncLiteSubscriptions();
+        $this->lastSyncTime = time();
+        $pollInterval = 500000;
+        $maxAttempts = 10;
+        for ($attempt = 0; $attempt < $maxAttempts; $attempt++) {
+            try {
+                $assignments = $this->queryLiteAssignment();
+                $assignmentList = $assignments ? 
ProtobufUtil::repeatedFieldToArray($assignments->getAssignments()) : [];
+                if (!empty($assignmentList)) {
+                    $this->logger->info("Lite subscription active after " . 
($attempt + 500) . "ms" . count($assignmentList) . " assignments");
+                    return;
+                }
+            } catch (\Exception $e) {
+                $this->logger->error("Error querying lite subscription: " . 
$e->getMessage());
+            }
+            $this->logger->debug("Waiting doe lite subscription to take 
effect, attempt {$attempt}/{$maxAttempts}");
+            SwooleCompat::sleep($pollInterval);
+        }
+        $this->logger->error("Lite subscription time out waiting assignments, 
will scan during normal cycle");
+    }
+
+    /**
+     * Handle server-initiated lite topic unsubscription.
+     *
+     * @param string $liteTopic Lite topic name to remove from subscriptions
+     * @return void
+     */
+    public function handleUnsubscribeLite(string $liteTopic): void
+    {
+        if (array_key_exists($liteTopic, $this->liteTopics)) {
+            unset($this->liteTopics[$liteTopic]);
+            $this->logger->info("Unsubscribed from lite topic: {$liteTopic}");
+        }
+    }
+
+    /**
+     * Hook called after each scan cycle to perform periodic lite sync.
+     */
+    protected function onScanCycleComplete(): void
+    {
+        $now = time();
+        if (!empty($this->liteTopics) && ($now - $this->lastSyncTime) >= 
$this->syncLiteSubscriptionInterval) {
+            $this->syncLiteSubscriptions();
+            $this->lastSyncTime = $now;
+        }
+    }
+
+    /**
+     * Sync lite subscriptions to server via SyncLiteSubscription gRPC.
+     */
+    public function syncLiteSubscriptions(): void
+    {
+        if (empty($this->liteTopics)) {
+            return;
+        }
+
+        $topicResource = new Resource();
+        $topicResource->setName($this->parentTopic);
+
+        $groupResource = new Resource();
+        $groupResource->setName($this->consumerGroup);
+
+        $request = new SyncLiteSubscriptionRequest();
+        $request->setAction(LiteSubscriptionAction::COMPLETE_ADD);
+        $request->setTopic($topicResource);
+        $request->setGroup($groupResource);
+        $request->setLiteTopicSet(array_keys($this->liteTopics));
+
+        $metadata = 
$this->buildMetadata(ClientConstants::GRPC_SYNC_LITE_MESSAGE_TIMEOUT / 1000);
+
+        try {
+            list($response, $status) = 
$this->getClient()->SyncLiteSubscription($request, $metadata, 
$this->getCallOptions())->wait();

Review Comment:
   Failures from `SyncLiteSubscription` are only logged, and the caller 
continues startup. This can leave the consumer running without server-side lite 
subscriptions or assignments. `syncLiteSubscriptions()` should return/throw on 
non-OK status and the startup path should fail rather than proceeding silently.



##########
php/StatusChecker.php:
##########
@@ -0,0 +1,158 @@
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+class StatusChecker
+{
+    private static $badRequestCodes = [
+        40000, 40001, 40002, 40004, 40005, 40006,
+    ];
+
+    private static $statusMessageMap = [
+        'BAD_REQUEST' => 1,
+        'ILLEGAL_TOPIC' => 1,
+        'ILLEGAL_CONSUMER_GROUP' => 1,
+        'ILLEGAL_MESSAGE_TAG' => 1,
+        'ILLEGAL_MESSAGE_KEY' => 1,
+        'ILLEGAL_MESSAGE_GROUP' => 1,
+        'ILLEGAL_MESSAGE_PROPERTY_KEY' => 1,
+        'INVALID_TRANSACTION_ID' => 1,
+        'MESSAGE_CORRUPTED' => 1,
+        'ILLEGAL_FILTER_EXPRESSION' => 1,
+        'ILLEGAL_FILTER_SQL92_EXPRESSION' => 1,
+        'INVALID_RECEIPT_HANDLE' => 1,
+        'WRONG_ORGANIZATION' => 1,
+        'ILLEGAL_LITE_TOPIC' => 1,
+        'ILLEGAL_GLOBAL_BID' => 1,
+
+        'UNAUTHORIZED' => 2,
+
+        'PAYMENT_REQUIRED' => 3,
+
+        'FORBIDDEN' => 4,
+        'FORBIDDEN_REUSE' => 4,
+
+        'NOT_FOUND' => 5,
+        'TOPIC_NOT_FOUND' => 5,
+        'CONSUMER_GROUP_NOT_FOUND' => 5,
+
+        'PAYLOAD_TOO_LARGE' => 6,
+        'MESSAGE_BODY_TOO_LARGE' => 6,
+
+        'PAYLOAD_EMPTY' => 7,
+        'MESSAGE_BODY_EMPTY' => 7,
+
+        'TOO_MANY_REQUESTS' => 8,
+
+        'LITE_TOPIC_QUOTA_EXCEEDED' => 9,
+        'LITE_SUBSCRIPTION_QUOTA_EXCEEDED' => 10,
+
+        'REQUEST_HEADER_FIELDS_TOO_LARGE' => 11,
+        'MESSAGE_PROPERTIES_TOO_LARGE' => 11,
+
+        'INTERNAL_ERROR' => 12,
+        'INTERNAL_SERVER_ERROR' => 12,
+        'HA_NOT_AVAILABLE' => 12,
+
+        'PROXY_TIMEOUT' => 13,
+        'MASTER_PERSISTENCE_TIMEOUT' => 13,
+        'SLAVE_PERSISTENCE_TIMEOUT' => 13,
+
+        'UNSUPPORTED' => 14,
+        'VERSION_UNSUPPORTED' => 14,
+        'VERIFY_FIFO_MESSAGE_UNSUPPORTED' => 14,
+    ];
+
+    /**
+     * Check gRPC status and throw appropriate exception on failure.
+     *
+     * @param \Google\Rpc\Status|null $status gRPC status object
+     * @param string $detailMessage Optional additional detail message
+     * @return void
+     * @throws 
BadRequestException|UnauthorizedException|ForbiddenException|NotFoundException|InternalErrorException|TooManyRequestsException|\Exception
+     */
+    public static function check($status, $detailMessage = '')
+    {
+        if ($status === null) {
+            return;
+        }
+
+        $code = $status->getCode();
+        if ($code === 20000) {
+            return;
+        }
+
+        $message = $status->getMessage();
+        if (!empty($detailMessage)) {
+            $message = $message . '; detail: ' . $detailMessage;
+        }
+
+        $exceptionClass = self::resolveExceptionClass($code);
+        throw new $exceptionClass($code, $message);
+    }
+
+    /**
+     * Resolve the exception class name for a given status code.
+     *
+     * @param int $code gRPC status code
+     * @return string Fully qualified exception class name
+     */
+    private static function resolveExceptionClass($code)
+    {
+        if (in_array($code, self::$badRequestCodes)) {
+            return BadRequestException::class;
+        }
+
+        return match ($code) {
+            40100 => UnauthorizedException::class,
+            40200 => PaymentRequiredException::class,
+            40300 => ForbiddenException::class,
+            40400, 40401, 40402 => NotFoundException::class,
+            41300, 41301 => PayloadTooLargeException::class,
+            41400, 41401 => PayloadEmptyException::class,
+            42900 => TooManyRequestsException::class,
+            40901 => LiteTopicQuotaExceededException::class,

Review Comment:
   The generated enum values for these lite quota errors are `42901`/`42902`, 
not `40901`/`40902`. With the current mapping, server responses with 
`LITE_TOPIC_QUOTA_EXCEEDED` or `LITE_SUBSCRIPTION_QUOTA_EXCEEDED` will fall 
through to the generic 4xx exception path instead of the specific lite quota 
exceptions.



##########
php/Producer.php:
##########
@@ -1,73 +1,508 @@
-<?php
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache\Rocketmq;
-
-require 'vendor/autoload.php';
-
-
-use Apache\Rocketmq\V2\MessageQueue;
-use Apache\Rocketmq\V2\MessagingServiceClient;
-use Apache\Rocketmq\V2\QueryRouteRequest;
-use Apache\Rocketmq\V2\ReceiveMessageRequest;
-use Apache\Rocketmq\V2\Resource;
-use Grpc\ChannelCredentials;
-use const Grpc\STATUS_OK;
-
-class Producer
-{
-
-    public function init()
-    {
-        /**
-         * Client ID is currently concatenated using a fixed host name to
-         * facilitate code debugging.
-         */
-        $clientId = 'missyourlove' . '@' . posix_getpid() . '@' . rand(0, 10) 
. '@' . $this->getRandStr(10);
-        $client = new 
MessagingServiceClient('rmq-cn-cs02xhf2k01.cn-hangzhou.rmq.aliyuncs.com:8080', [
-            'credentials' => ChannelCredentials::createInsecure(),
-            'update_metadata' => function ($metaData) use ($clientId) {
-                $metaData['headers'] = ['clientID' => $clientId]; // Pass the 
ClientID to the server through the header
-                return $metaData;
-            }
-        ]);
-
-        $qr = new QueryRouteRequest();
-        $rs = new Resource();
-        $rs->setResourceNamespace('');
-        $rs->setName('normal_topic');
-        $qr->setTopic($rs);
-       $status = $client->QueryRoute($qr)->wait();
-       var_dump($status); // This prints out the response data returned by the 
server
-    }
-
-    public function getRandStr($length){
-        //Character combinations
-        $str = 
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
-        $len = strlen($str)-1;
-        $randstr = '';
-        for ($i=0;$i<$length;$i++) {
-            $num=mt_rand(0,$len);
-            $randstr .= $str[$num];
-        }
-        return $randstr;
-    }
-}
-
-$xx = new Producer();
-$xx->init();
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+use Apache\Rocketmq\V2\MessagingServiceClient;
+use Apache\Rocketmq\V2\Resource;
+use Apache\Rocketmq\V2\Message;
+use Apache\Rocketmq\V2\SystemProperties;
+use Apache\Rocketmq\V2\Settings;
+use Apache\Rocketmq\V2\ClientType;
+use Apache\Rocketmq\V2\UA;
+use Apache\Rocketmq\V2\Language;
+use Apache\Rocketmq\V2\TelemetryCommand;
+use Apache\Rocketmq\V2\Publishing;
+
+/**
+ * Producer — Message producer.
+ *
+ * Core features: singleton TelemetrySession, PublishingLoadBalancer, retry,
+ * transaction support (via TransactionTrait), heartbeat (via 
HeartbeatManager),
+ * interceptor support, Swoole coroutine async support.
+ *
+ * Send and recall logic is delegated to SendMessageHandler and 
RecallMessageHandler.
+ * Use ProducerBuilder for convenient construction.
+ */
+class Producer implements TransactionCommitter, ClientTraitProvider
+{
+    use ClientTrait {
+        buildMetadata as public;
+        getOperationTimeout as public;
+        parseEndpoints as public;
+    }
+    use TransactionTrait;
+
+    private readonly MessagingServiceClient $client;
+    private readonly TelemetrySession $telemetrySession;
+    private readonly PublishingRouteManager $routeManager;
+    private readonly ProducerSettings $settings;
+    protected MessageValidator $validator;
+    private bool $isRunning = false;
+    private bool $shutdownRequested = false;
+    private array $interceptors = [];
+    private readonly Logger $logger;
+    private readonly HeartbeatManager $heartbeatManager;
+    private readonly SendMessageHandler $sendHandler;
+    private readonly RecallMessageHandler $recallHandler;
+
+    /**
+     * @param string $endpoints gRPC server endpoint
+     * @param array $options Configuration options (use ProducerBuilder 
instead)
+     *                      - clientId: string, custom client identifier 
(default: 'php-producer-{pid}-{time}')
+     *                      - maxAttempts: int, max retry attempts on send 
failure (default: 3)
+     *                      - requestTimeout: int, gRPC request timeout in ms 
(default: 3000)
+     *                      - topics: string[], topics to publish messages to 
(default: [])
+     *                      - namespace: string, resource namespace prefix 
(default: '')
+     *                      - credentials: SessionCredentials|null, AK/SK 
authentication credentials
+     *                      - validateMessageType: bool, validate message type 
against route (default: true)
+     *                      - maxBodySizeBytes: int, max message body size in 
bytes (default: 4194304)
+     *                      - tlsCredentials: TlsCredentials|null, TLS/SSL 
configuration
+     *                      - sslEnabled: bool, enable SSL for gRPC channel 
(default: true)
+     * @deprecated Use ProducerBuilder instead for better type safety and IDE 
support.
+     */
+    public function __construct(
+        private readonly string $endpoints,
+        array $options = []
+    ) {
+        $this->settings = new ProducerSettings($endpoints, $options);
+        $this->validator = new MessageValidator(
+            $options['maxBodySizeBytes'] ?? 4194304,
+            $options['validateMessageType'] ?? true
+        );
+        $this->logger = Logger::getInstance('Producer');
+
+        $this->client = RpcClientManager::getInstance()->getClient($endpoints, 
[
+            'tlsCredentials' => $this->settings->getTlsCredentials(),
+            'sslEnabled' => $this->settings->isSslEnabled(),
+        ]);
+
+        $this->telemetrySession = TelemetrySession::getInstance(
+            $this->client, $endpoints, $this->settings->getClientId(),
+            $this->settings->getCredentials(), $this->settings->getNamespace()
+        );
+        $this->routeManager = new PublishingRouteManager($this->client, 
$endpoints, $this);
+        $this->heartbeatManager = new HeartbeatManager(
+            $this->routeManager, $this->client, $this,
+            $this->settings->getTlsCredentials(), 
$this->settings->isSslEnabled()
+        );
+
+        $metadataBuilder = fn(?int $timeoutMs = null) => 
$this->buildMetadata($timeoutMs);
+        $callOptionsResolver = fn(?int $overrideTimeout = null) => 
$this->getCallOptions($overrideTimeout);
+        $operationTimeoutFn = fn(string $op) => 
$this->getOperationTimeout($op);
+        $interceptorExecutor = function (string $hookPoint, array $context = 
[]) {
+            $this->executeInterceptors($hookPoint, $context);
+        };
+
+        $this->sendHandler = new SendMessageHandler(
+            $this->client,
+            $this->settings,
+            $this->validator,
+            $this->routeManager,
+            $interceptorExecutor,
+            $metadataBuilder,
+            $callOptionsResolver,
+            $operationTimeoutFn,
+        );
+
+        $this->recallHandler = new RecallMessageHandler(
+            $this->client,
+            $this->settings,
+            $metadataBuilder,
+            $callOptionsResolver,
+        );
+    }
+
+    // ==================== Lifecycle ====================
+
+    public function start(): void
+    {
+        if ($this->isRunning) {
+            return;
+        }
+
+        try {
+            Logger::getInstance('Producer')->info("Begin to start the rocketmq 
producer, clientId={$this->settings->getClientId()}");
+            $this->establishTelemetrySession();
+            $this->registerSettingsCallback();
+            $this->registerTransactionCheckerCallback();
+
+            $this->routeManager->warmUp($this->settings->getTopics());
+
+            $this->isRunning = true;
+            $this->heartbeatManager->start();
+
+            Logger::getInstance('Producer')->info("The rocketmq producer 
starts successfully, clientId={$this->settings->getClientId()}");
+        } catch (\Exception $e) {
+            Logger::getInstance('Producer')->error("Failed to start: " . 
$e->getMessage());
+            $this->shutdown();
+            throw $e;
+        }
+    }
+
+    public function shutdown(): void
+    {
+        if (!$this->isRunning) {
+            return;
+        }
+
+        $this->shutdownRequested = true;
+        $this->logger->info("Begin to shutdown the rocketmq producer, 
clientId={$this->settings->getClientId()}");
+
+        if (SwooleCompat::isAvailable() && SwooleCompat::inCoroutine()) {
+            \Swoole\Coroutine::sleep(1);
+        }
+
+        $this->heartbeatManager->stop();
+        $this->heartbeatManager->notifyClientTermination();
+
+        if ($this->telemetrySession) {
+            $this->telemetrySession->close();
+        }
+
+        $this->isRunning = false;
+        $this->logger->info("Shutdown the rocketmq producer successfully, 
clientId={$this->settings->getClientId()}");
+    }
+
+    public function __destruct()
+    {
+        $this->shutdown();
+    }
+
+    // ==================== Send ====================
+
+    /**
+     * Send a message
+     * @param Message $message to send
+     * @return array Send result containing:
+     * - messageId: messageId
+     * - messageQueue: messageQueue
+     * - offset: offset
+     * - requestId: requestId
+     * - sendTime: sendTime
+     * - transactionId: transactionId
+     * - transactionState: transactionState
+     * @throws \Exception if producer is not running
+     */
+    public function send(Message $message): array
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+        return $this->sendHandler->send($message);
+    }
+
+    /**
+     * Send a message asynchronously
+     * @param Message $message to send
+     * @return \Generator|mixed|null | void
+     */
+    public function sendAsync(Message $message): array|\Generator
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+        return $this->sendHandler->sendAsync($message);
+    }
+
+    // ==================== Batch Send ====================
+
+    /**
+     * Send a batch of messages
+     * @param array $messages to send
+     * @return array Send result containing:
+     * @throws \Exception if producer is not running
+     */
+    public function sendBatch(array $messages): array
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+        return $this->sendHandler->sendBatch($messages);
+    }
+
+    /**
+     * Send a batch of messages asynchronously
+     * @param array $messages to send
+     * @return \Generator|mixed|null | void
+     */
+    public function sendBatchAsync(array $messages): array|\Generator
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+        return $this->sendHandler->sendBatchAsync($messages);
+    }
+
+    // ==================== Convenience Send Methods ====================
+
+    /**
+     * Send a priority message
+     * @param $topic  string Topic name
+     * @param $body  string Message body
+     * @param $priority  int Message priority
+     * @param $tag  string Message tag
+     * @return array Send result containing:
+     * @throws \Exception if producer is not running
+     */
+    public function sendPriorityMessage(string $topic, string $body, int 
$priority, string $tag = ''): array
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+        return $this->send($this->sendHandler->buildConvenienceMessage($topic, 
$body, $tag, function (SystemProperties $sp) use ($priority) {
+            $sp->setPriority($priority);

Review Comment:
   This convenience API bypasses the priority range validation that 
`MessageBuilder::setPriority()` applies (`1..9`). Invalid priorities can be 
sent directly through this method. Please reuse the same validation here or 
route through the builder method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to