zhaohai666 commented on code in PR #1250:
URL: https://github.com/apache/rocketmq-clients/pull/1250#discussion_r3315064885


##########
php/Producer.php:
##########
@@ -1,73 +1,1404 @@
-<?php
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache\Rocketmq;
-
-require 'vendor/autoload.php';
-
-
-use Apache\Rocketmq\V2\MessageQueue;
-use Apache\Rocketmq\V2\MessagingServiceClient;
-use Apache\Rocketmq\V2\QueryRouteRequest;
-use Apache\Rocketmq\V2\ReceiveMessageRequest;
-use Apache\Rocketmq\V2\Resource;
-use Grpc\ChannelCredentials;
-use const Grpc\STATUS_OK;
-
-class Producer
-{
-
-    public function init()
-    {
-        /**
-         * Client ID is currently concatenated using a fixed host name to
-         * facilitate code debugging.
-         */
-        $clientId = 'missyourlove' . '@' . posix_getpid() . '@' . rand(0, 10) 
. '@' . $this->getRandStr(10);
-        $client = new 
MessagingServiceClient('rmq-cn-cs02xhf2k01.cn-hangzhou.rmq.aliyuncs.com:8080', [
-            'credentials' => ChannelCredentials::createInsecure(),
-            'update_metadata' => function ($metaData) use ($clientId) {
-                $metaData['headers'] = ['clientID' => $clientId]; // Pass the 
ClientID to the server through the header
-                return $metaData;
-            }
-        ]);
-
-        $qr = new QueryRouteRequest();
-        $rs = new Resource();
-        $rs->setResourceNamespace('');
-        $rs->setName('normal_topic');
-        $qr->setTopic($rs);
-       $status = $client->QueryRoute($qr)->wait();
-       var_dump($status); // This prints out the response data returned by the 
server
-    }
-
-    public function getRandStr($length){
-        //Character combinations
-        $str = 
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
-        $len = strlen($str)-1;
-        $randstr = '';
-        for ($i=0;$i<$length;$i++) {
-            $num=mt_rand(0,$len);
-            $randstr .= $str[$num];
-        }
-        return $randstr;
-    }
-}
-
-$xx = new Producer();
-$xx->init();
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+require_once __DIR__ . '/autoload.php';
+require_once __DIR__ . '/RpcClientManager.php';
+require_once __DIR__ . '/MessageId.php';
+require_once __DIR__ . '/MessageIdImpl.php';
+require_once __DIR__ . '/MessageIdCodec.php';
+require_once __DIR__ . '/TelemetrySession.php';
+require_once __DIR__ . '/ConsumeResult.php';
+require_once __DIR__ . '/Logger.php';
+require_once __DIR__ . '/Signature.php';
+require_once __DIR__ . '/ClientConstants.php';
+require_once __DIR__ . '/ClientTrait.php';
+require_once __DIR__ . '/TransactionChecker.php';
+require_once __DIR__ . '/ExponentialBackoffRetryPolicy.php';
+require_once __DIR__ . '/SwooleCompat.php';
+require_once __DIR__ . '/ProtobufUtil.php';
+require_once __DIR__ . '/PublishingLoadBalancer.php';
+require_once __DIR__ . '/Transaction.php';
+require_once __DIR__ . '/IntMath.php';
+require_once __DIR__ . '/MessageHookPoints.php';
+
+use Apache\Rocketmq\V2\MessagingServiceClient;
+use Apache\Rocketmq\V2\Permission;
+use Apache\Rocketmq\V2\QueryRouteRequest;
+use Apache\Rocketmq\V2\SendMessageRequest;
+use Apache\Rocketmq\V2\EndTransactionRequest;
+use Apache\Rocketmq\V2\RecallMessageRequest;
+use Apache\Rocketmq\V2\HeartbeatRequest;
+use Apache\Rocketmq\V2\Resource;
+use Apache\Rocketmq\V2\Message;
+use Apache\Rocketmq\V2\SystemProperties;
+use Apache\Rocketmq\V2\Settings;
+use Apache\Rocketmq\V2\ClientType;
+use Apache\Rocketmq\V2\UA;
+use Apache\Rocketmq\V2\Language;
+use Apache\Rocketmq\V2\TelemetryCommand;
+use Apache\Rocketmq\V2\Publishing;
+use Apache\Rocketmq\V2\TransactionResolution;
+use Apache\Rocketmq\V2\TransactionSource;
+use Apache\Rocketmq\V2\Endpoints;
+use Apache\Rocketmq\V2\Address;
+use Apache\Rocketmq\V2\AddressScheme;
+use Apache\Rocketmq\V2\NotifyClientTerminationRequest;
+use Grpc\ChannelCredentials;
+use Google\Protobuf\Timestamp;
+use Google\Protobuf\Duration;
+use Apache\Rocketmq\V2\Encoding;
+use Apache\Rocketmq\V2\MessageType as V2MessageType;
+
+/**
+ * Producer - Message producer
+ *
+ * Core features:
+ * 1. Singleton TelemetrySession management
+ * 2. PublishingLoadBalancer (Topic-level MessageQueue load balancing)
+ * 3. Complete state management (FSM)
+ * 4. Transaction message support
+ * 5. Delayed message recall
+ * 6. Interceptor support (Hook Points)
+ * 7. ExponentialBackoffRetryPolicy wired for retries
+ * 8. TransactionChecker for orphaned transaction recovery
+ * 9. Batch send support
+ * 10. Swoole coroutine async support
+ */
+class Producer
+{
+    use ClientTrait;
+
+    private $client;
+    private $endpoints;
+    private $clientId;
+    private $telemetrySession;
+    private $publishingRouteDataCache = [];
+    private $isRunning = false;
+    private $shutdownRequested = false;
+    private $maxAttempts = 3;
+    private $requestTimeout = 3000; // ms
+    private $topics = [];
+    private $isolatedEndpoints = [];
+    private $namespace = '';
+    private $logger;
+    private $credentials = null; // SessionCredentials for AK/SK auth
+    private $validateMessageType = true;
+    private $maxBodySizeBytes = 4194304; // 4MB default
+    private $heartbeatPid = null;
+    private $lastHeartbeatTime = 0;
+    private $interceptors = [];
+    private $transactionChecker = null;
+    private $localTransactionExecuter = null;
+    private $retryPolicy = null;
+    private $tlsCredentials = null;
+
+    /**
+     * Constructor
+     *
+     * @param string $endpoints gRPC server endpoint
+     * @param array $options Configuration options
+     * @deprecated Use ProducerBuilder instead.
+     */
+    public function __construct($endpoints, $options = [])
+    {
+        $this->endpoints = $endpoints;
+        $this->clientId = $options['clientId'] ?? ('php-producer-' . 
getmypid() . '-' . time());
+        $this->maxAttempts = $options['maxAttempts'] ?? 3;
+        $this->requestTimeout = $options['requestTimeout'] ?? 3000;
+        $this->topics = $options['topics'] ?? [];
+        $this->namespace = $options['namespace'] ?? '';
+        $this->validateMessageType = $options['validateMessageType'] ?? true;
+        $this->maxBodySizeBytes = $options['maxBodySizeBytes'] ?? 4194304;
+        $this->tlsCredentials = $options['tlsCredentials'] ?? null;
+
+        // Set AK/SK credentials if provided
+        if (isset($options['credentials']) && $options['credentials'] 
instanceof SessionCredentials) {
+            $this->credentials = $options['credentials'];
+        }
+
+        // Initialize retry policy
+        $this->retryPolicy = new 
ExponentialBackoffRetryPolicy($this->maxAttempts, 1000, 30000, 2.0);
+
+        $this->logger = Logger::getInstance('Producer');
+
+        // Use RpcClientManager for connection pooling
+        $this->client = RpcClientManager::getInstance()->getClient($endpoints, 
[
+            'tlsCredentials' => $this->tlsCredentials,
+        ]);
+
+        // Initialize Telemetry Session (singleton)
+        $this->telemetrySession = TelemetrySession::getInstance($this->client, 
$endpoints, $this->clientId, $this->credentials, $this->namespace);
+    }
+
+    /**
+     * Set transaction checker for orphaned transaction recovery.
+     */
+    public function setTransactionChecker(TransactionChecker $checker): self
+    {
+        $this->transactionChecker = $checker;
+        return $this;
+    }
+
+    /**
+     * Set local transaction executer for auto commit/rollback of 
half-messages.
+     *
+     * @param LocalTransactionExecuter $executer
+     * @return $this
+     */
+    public function setLocalTransactionExecuter(LocalTransactionExecuter 
$executer): self
+    {
+        $this->localTransactionExecuter = $executer;
+        return $this;
+    }
+
+    public function start()
+    {
+        if ($this->isRunning) {
+            return;
+        }
+
+        try {
+            Logger::getInstance('Producer')->info("Begin to start the rocketmq 
producer, clientId={$this->clientId}");
+
+            // Establish Telemetry Session
+            $this->establishTelemetrySession();
+
+            // Register settings change callback
+            $this->registerSettingsCallback();
+
+            // Register transaction checker callback if set
+            $this->registerTransactionCheckerCallback();
+
+            // Warm up route cache
+            foreach ($this->topics as $topic) {
+                $this->getPublishingLoadBalancer($topic);
+            }
+
+            $this->isRunning = true;
+
+            // Start periodic heartbeat
+            $this->startHeartbeat();
+
+            Logger::getInstance('Producer')->info("The rocketmq producer 
starts successfully, clientId={$this->clientId}");
+        } catch (\Exception $e) {
+            Logger::getInstance('Producer')->error("Failed to start: " . 
$e->getMessage());
+            $this->shutdown();
+            throw $e;
+        }
+    }
+
+    /**
+     * Synchronously send a message
+     *
+     * @param Message $message Message object
+     * @return array Send result ['messageId' => ..., 'transactionId' => ..., 
'status' => ...]
+     */
+    public function send(Message $message)
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+
+        $this->validateMessage($message);
+
+        $topic = $message->getTopic()->getName();
+        $loadBalancer = $this->getPublishingLoadBalancer($topic);
+        $candidates = 
$loadBalancer->takeMessageQueue($this->getIsolatedBrokerNames(), 
$this->maxAttempts);
+
+        if (empty($candidates)) {
+            throw new \RuntimeException("No available message queue for topic: 
{$topic}");
+        }
+
+        if ($this->validateMessageType) {
+            $msgType = $this->detectMessageType($message, false);
+            $loadBalancer->validateMessageTypeAgainstQueue($candidates[0], 
$msgType, $topic);
+        }
+
+        $request = $this->wrapSendMessageRequest([$message], $candidates[0]);
+
+        return $this->sendMessageWithRetry($request, $message, $candidates, 
$this->maxAttempts);
+    }
+
+    /**
+     * Asynchronously send a message using Swoole coroutine if available.
+     *
+     * @param Message $message
+     * @return array|\Generator
+     */
+    public function sendAsync(Message $message)
+    {
+        if (SwooleCompat::isAvailable() && SwooleCompat::inCoroutine()) {
+            $channel = new \Swoole\Coroutine\Channel(1);
+            \Swoole\Coroutine::create(function () use ($message, $channel) {
+                try {
+                    $result = $this->send($message);
+                    $channel->push(['success' => true, 'result' => $result]);
+                } catch (\Throwable $e) {
+                    $channel->push(['success' => false, 'error' => $e]);
+                }
+            });
+            return $channel->pop();
+        }
+        yield $this->send($message);
+    }
+
+    /**
+     * Batch send messages. All messages must share the same topic.
+     *
+     * @param Message[] $messages
+     * @return array Array of send results
+     */
+    public function sendBatch(array $messages)
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+
+        if (empty($messages)) {
+            throw new \InvalidArgumentException("Batch messages cannot be 
empty");
+        }
+
+        // Validate all messages share the same topic
+        $topic = $messages[0]->getTopic()->getName();
+        $messageTypes = [];
+        $messageGroups = [];
+        $hasFifoMessage = false;
+        foreach ($messages as $msg) {
+            if ($msg->getTopic()->getName() !== $topic) {
+                throw new \InvalidArgumentException("All messages in a batch 
must have the same topic");
+            }
+            $this->validateMessage($msg);
+            if ($this->validateMessageType) {
+                $msgType = $this->detectMessageType($msg, false);
+                $messageTypes[] = $msgType;
+            }
+            $sysProps = $msg->getSystemProperties();
+            if ($sysProps && method_exists($sysProps, 'hasMessageGroup') && 
$sysProps->hasMessageGroup()) {
+                $hasFifoMessage = true;
+                $messageGroups[] = $sysProps->getMessageGroup();
+            }
+        }
+        if ($this->validateMessageType && count(array_unique($messageTypes)) > 
1) {
+            throw new \InvalidArgumentException('Messages to send different 
message types , please check');
+        }
+        if ($hasFifoMessage && count(array_unique($messageGroups)) > 1) {
+            throw new \InvalidArgumentException("FIFO messages to send have 
different message groups, please check");
+        }
+
+        $loadBalancer = $this->getPublishingLoadBalancer($topic);
+        $isolatedBroker = array_keys($this->isolatedEndpoints);
+        $messageQueue = $loadBalancer->takeMessageQueue($isolatedBroker, 1);

Review Comment:
   Fixed



##########
php/Producer.php:
##########
@@ -1,73 +1,1404 @@
-<?php
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache\Rocketmq;
-
-require 'vendor/autoload.php';
-
-
-use Apache\Rocketmq\V2\MessageQueue;
-use Apache\Rocketmq\V2\MessagingServiceClient;
-use Apache\Rocketmq\V2\QueryRouteRequest;
-use Apache\Rocketmq\V2\ReceiveMessageRequest;
-use Apache\Rocketmq\V2\Resource;
-use Grpc\ChannelCredentials;
-use const Grpc\STATUS_OK;
-
-class Producer
-{
-
-    public function init()
-    {
-        /**
-         * Client ID is currently concatenated using a fixed host name to
-         * facilitate code debugging.
-         */
-        $clientId = 'missyourlove' . '@' . posix_getpid() . '@' . rand(0, 10) 
. '@' . $this->getRandStr(10);
-        $client = new 
MessagingServiceClient('rmq-cn-cs02xhf2k01.cn-hangzhou.rmq.aliyuncs.com:8080', [
-            'credentials' => ChannelCredentials::createInsecure(),
-            'update_metadata' => function ($metaData) use ($clientId) {
-                $metaData['headers'] = ['clientID' => $clientId]; // Pass the 
ClientID to the server through the header
-                return $metaData;
-            }
-        ]);
-
-        $qr = new QueryRouteRequest();
-        $rs = new Resource();
-        $rs->setResourceNamespace('');
-        $rs->setName('normal_topic');
-        $qr->setTopic($rs);
-       $status = $client->QueryRoute($qr)->wait();
-       var_dump($status); // This prints out the response data returned by the 
server
-    }
-
-    public function getRandStr($length){
-        //Character combinations
-        $str = 
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
-        $len = strlen($str)-1;
-        $randstr = '';
-        for ($i=0;$i<$length;$i++) {
-            $num=mt_rand(0,$len);
-            $randstr .= $str[$num];
-        }
-        return $randstr;
-    }
-}
-
-$xx = new Producer();
-$xx->init();
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+require_once __DIR__ . '/autoload.php';
+require_once __DIR__ . '/RpcClientManager.php';
+require_once __DIR__ . '/MessageId.php';
+require_once __DIR__ . '/MessageIdImpl.php';
+require_once __DIR__ . '/MessageIdCodec.php';
+require_once __DIR__ . '/TelemetrySession.php';
+require_once __DIR__ . '/ConsumeResult.php';
+require_once __DIR__ . '/Logger.php';
+require_once __DIR__ . '/Signature.php';
+require_once __DIR__ . '/ClientConstants.php';
+require_once __DIR__ . '/ClientTrait.php';
+require_once __DIR__ . '/TransactionChecker.php';
+require_once __DIR__ . '/ExponentialBackoffRetryPolicy.php';
+require_once __DIR__ . '/SwooleCompat.php';
+require_once __DIR__ . '/ProtobufUtil.php';
+require_once __DIR__ . '/PublishingLoadBalancer.php';
+require_once __DIR__ . '/Transaction.php';
+require_once __DIR__ . '/IntMath.php';
+require_once __DIR__ . '/MessageHookPoints.php';
+
+use Apache\Rocketmq\V2\MessagingServiceClient;
+use Apache\Rocketmq\V2\Permission;
+use Apache\Rocketmq\V2\QueryRouteRequest;
+use Apache\Rocketmq\V2\SendMessageRequest;
+use Apache\Rocketmq\V2\EndTransactionRequest;
+use Apache\Rocketmq\V2\RecallMessageRequest;
+use Apache\Rocketmq\V2\HeartbeatRequest;
+use Apache\Rocketmq\V2\Resource;
+use Apache\Rocketmq\V2\Message;
+use Apache\Rocketmq\V2\SystemProperties;
+use Apache\Rocketmq\V2\Settings;
+use Apache\Rocketmq\V2\ClientType;
+use Apache\Rocketmq\V2\UA;
+use Apache\Rocketmq\V2\Language;
+use Apache\Rocketmq\V2\TelemetryCommand;
+use Apache\Rocketmq\V2\Publishing;
+use Apache\Rocketmq\V2\TransactionResolution;
+use Apache\Rocketmq\V2\TransactionSource;
+use Apache\Rocketmq\V2\Endpoints;
+use Apache\Rocketmq\V2\Address;
+use Apache\Rocketmq\V2\AddressScheme;
+use Apache\Rocketmq\V2\NotifyClientTerminationRequest;
+use Grpc\ChannelCredentials;
+use Google\Protobuf\Timestamp;
+use Google\Protobuf\Duration;
+use Apache\Rocketmq\V2\Encoding;
+use Apache\Rocketmq\V2\MessageType as V2MessageType;
+
+/**
+ * Producer - Message producer
+ *
+ * Core features:
+ * 1. Singleton TelemetrySession management
+ * 2. PublishingLoadBalancer (Topic-level MessageQueue load balancing)
+ * 3. Complete state management (FSM)
+ * 4. Transaction message support
+ * 5. Delayed message recall
+ * 6. Interceptor support (Hook Points)
+ * 7. ExponentialBackoffRetryPolicy wired for retries
+ * 8. TransactionChecker for orphaned transaction recovery
+ * 9. Batch send support
+ * 10. Swoole coroutine async support
+ */
+class Producer
+{
+    use ClientTrait;
+
+    private $client;
+    private $endpoints;
+    private $clientId;
+    private $telemetrySession;
+    private $publishingRouteDataCache = [];
+    private $isRunning = false;
+    private $shutdownRequested = false;
+    private $maxAttempts = 3;
+    private $requestTimeout = 3000; // ms
+    private $topics = [];
+    private $isolatedEndpoints = [];
+    private $namespace = '';
+    private $logger;
+    private $credentials = null; // SessionCredentials for AK/SK auth
+    private $validateMessageType = true;
+    private $maxBodySizeBytes = 4194304; // 4MB default
+    private $heartbeatPid = null;
+    private $lastHeartbeatTime = 0;
+    private $interceptors = [];
+    private $transactionChecker = null;
+    private $localTransactionExecuter = null;
+    private $retryPolicy = null;
+    private $tlsCredentials = null;
+
+    /**
+     * Constructor
+     *
+     * @param string $endpoints gRPC server endpoint
+     * @param array $options Configuration options
+     * @deprecated Use ProducerBuilder instead.
+     */
+    public function __construct($endpoints, $options = [])
+    {
+        $this->endpoints = $endpoints;
+        $this->clientId = $options['clientId'] ?? ('php-producer-' . 
getmypid() . '-' . time());
+        $this->maxAttempts = $options['maxAttempts'] ?? 3;
+        $this->requestTimeout = $options['requestTimeout'] ?? 3000;
+        $this->topics = $options['topics'] ?? [];
+        $this->namespace = $options['namespace'] ?? '';
+        $this->validateMessageType = $options['validateMessageType'] ?? true;
+        $this->maxBodySizeBytes = $options['maxBodySizeBytes'] ?? 4194304;
+        $this->tlsCredentials = $options['tlsCredentials'] ?? null;
+
+        // Set AK/SK credentials if provided
+        if (isset($options['credentials']) && $options['credentials'] 
instanceof SessionCredentials) {
+            $this->credentials = $options['credentials'];
+        }
+
+        // Initialize retry policy
+        $this->retryPolicy = new 
ExponentialBackoffRetryPolicy($this->maxAttempts, 1000, 30000, 2.0);
+
+        $this->logger = Logger::getInstance('Producer');
+
+        // Use RpcClientManager for connection pooling
+        $this->client = RpcClientManager::getInstance()->getClient($endpoints, 
[
+            'tlsCredentials' => $this->tlsCredentials,
+        ]);
+
+        // Initialize Telemetry Session (singleton)
+        $this->telemetrySession = TelemetrySession::getInstance($this->client, 
$endpoints, $this->clientId, $this->credentials, $this->namespace);
+    }
+
+    /**
+     * Set transaction checker for orphaned transaction recovery.
+     */
+    public function setTransactionChecker(TransactionChecker $checker): self
+    {
+        $this->transactionChecker = $checker;
+        return $this;
+    }
+
+    /**
+     * Set local transaction executer for auto commit/rollback of 
half-messages.
+     *
+     * @param LocalTransactionExecuter $executer
+     * @return $this
+     */
+    public function setLocalTransactionExecuter(LocalTransactionExecuter 
$executer): self
+    {
+        $this->localTransactionExecuter = $executer;
+        return $this;
+    }
+
+    public function start()
+    {
+        if ($this->isRunning) {
+            return;
+        }
+
+        try {
+            Logger::getInstance('Producer')->info("Begin to start the rocketmq 
producer, clientId={$this->clientId}");
+
+            // Establish Telemetry Session
+            $this->establishTelemetrySession();
+
+            // Register settings change callback
+            $this->registerSettingsCallback();
+
+            // Register transaction checker callback if set
+            $this->registerTransactionCheckerCallback();
+
+            // Warm up route cache
+            foreach ($this->topics as $topic) {
+                $this->getPublishingLoadBalancer($topic);
+            }
+
+            $this->isRunning = true;
+
+            // Start periodic heartbeat
+            $this->startHeartbeat();
+
+            Logger::getInstance('Producer')->info("The rocketmq producer 
starts successfully, clientId={$this->clientId}");
+        } catch (\Exception $e) {
+            Logger::getInstance('Producer')->error("Failed to start: " . 
$e->getMessage());
+            $this->shutdown();
+            throw $e;
+        }
+    }
+
+    /**
+     * Synchronously send a message
+     *
+     * @param Message $message Message object
+     * @return array Send result ['messageId' => ..., 'transactionId' => ..., 
'status' => ...]
+     */
+    public function send(Message $message)
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+
+        $this->validateMessage($message);
+
+        $topic = $message->getTopic()->getName();
+        $loadBalancer = $this->getPublishingLoadBalancer($topic);
+        $candidates = 
$loadBalancer->takeMessageQueue($this->getIsolatedBrokerNames(), 
$this->maxAttempts);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to