zhaohai666 commented on code in PR #1256:
URL: https://github.com/apache/rocketmq-clients/pull/1256#discussion_r3497372416


##########
php/TransactionTrait.php:
##########
@@ -0,0 +1,283 @@
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+use Apache\Rocketmq\V2\EndTransactionRequest;
+use Apache\Rocketmq\V2\Endpoints;
+use Apache\Rocketmq\V2\TransactionResolution;
+use Apache\Rocketmq\V2\TransactionSource;
+use Apache\Rocketmq\V2\Resource;
+use Apache\Rocketmq\V2\Message;
+
+/**
+ * TransactionTrait — Transaction (half-message) send, commit, rollback, and 
orphaned recovery.
+ *
+ * Extracted from Producer. The using class must provide:
+ * - Properties: $isRunning, $validator (MessageValidator, protected)
+ * - ClientTrait methods: buildMetadata(), getOperationTimeout()
+ * - Send delegation: validateMessage(), detectMessageType(),
+ *   wrapTransactionMessageRequest(), sendMessageWithRetry()
+ * - Route delegation: getPublishingLoadBalancer(), getIsolatedBrokerNames()
+ * - Infrastructure: getClientForRpc(), getTelemetrySession(), getLogger(),
+ *   getSettingsMaxAttempts(), getSettingsTlsCredentials(), 
isSettingsSslEnabled()
+ * - Interceptors: executeInterceptors()
+ */
+trait TransactionTrait
+{
+    private ?TransactionChecker $transactionChecker = null;
+    private ?LocalTransactionExecuter $localTransactionExecuter = null;
+
+    /**
+     * Set transaction checker for orphaned transaction recovery.
+     */
+    public function setTransactionChecker(TransactionChecker $checker): self
+    {
+        $this->transactionChecker = $checker;
+        return $this;
+    }
+
+    /**
+     * Set local transaction executer for auto commit/rollback of 
half-messages.
+     */
+    public function setLocalTransactionExecuter(LocalTransactionExecuter 
$executer): self
+    {
+        $this->localTransactionExecuter = $executer;
+        return $this;
+    }
+
+    /**
+     * Send a transaction message (half-message + local transaction + 
commit/rollback).
+     */
+    public function sendWithTransaction(Message $message, Transaction 
$transaction, ?LocalTransactionExecuter $executor = null): array
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+
+        $this->validateMessage($message);
+
+        $sysProps = $message->getSystemProperties();
+        $hasMessageGroup = $sysProps && $sysProps->hasMessageGroup();
+        $hasLiteTopic = $sysProps && $sysProps->hasLiteTopic();
+        $hasDeliveryTimestamp = $sysProps && $sysProps->hasDeliveryTimestamp();
+        $hasPriority = $sysProps && $sysProps->hasPriority();
+
+        if ($hasMessageGroup || $hasLiteTopic || $hasDeliveryTimestamp || 
$hasPriority) {
+            throw new \InvalidArgumentException(
+                "Transactional message should not set messageGroup, 
deliveryTimestamp, liteTopic, or priority"
+            );
+        }
+
+        $topic = $message->getTopic()->getName();
+        $loadBalancer = $this->getPublishingLoadBalancer($topic);
+        $messageQueue = 
$loadBalancer->takeMessageQueue($this->getIsolatedBrokerNames(), 
$this->getSettingsMaxAttempts());
+
+        if (empty($messageQueue)) {
+            throw new \RuntimeException("No available message queue for topic: 
{$topic}");
+        }
+
+        if ($this->validator->isValidateMessageType()) {
+            $msgType = $this->detectMessageType($message, true);
+            $loadBalancer->validateMessageTypeAgainstQueue($messageQueue[0], 
$msgType, $topic);
+        }
+
+        $request = $this->wrapTransactionMessageRequest([$message], 
$messageQueue[0]);
+        $result = $this->sendMessageWithRetry($request, $message, 
$messageQueue, $this->getSettingsMaxAttempts());
+
+        if (isset($result['transactionId'])) {
+            $transaction->tryAddMessage($message);
+            $transaction->tryAddReceipt($message, $result, 
PublishingRouteManager::extractMessageQueueEndpoint($messageQueue[0]));
+        }
+
+        if ($executor !== null) {

Review Comment:
   Fixed



##########
php/TelemetrySession.php:
##########
@@ -0,0 +1,949 @@
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+
+use Apache\Rocketmq\V2\MessagingServiceClient;
+use Apache\Rocketmq\V2\TelemetryCommand;
+use Apache\Rocketmq\V2\Settings;
+use Exception;
+use Grpc\ChannelCredentials;
+
+/**
+ * TelemetrySession - Telemetry Session (full implementation referencing Java 
ClientSessionImpl)
+ *
+ * Core features:
+ * 1. Singleton pattern (same Endpoints share Session)
+ * 2. Settings sync confirmation mechanism
+ * 3. Bidirectional stream management
+ * 4. Command dispatch processing
+ * 5. Automatic reconnection mechanism
+ * 6. Swoole coroutine background reader for server-pushed commands
+ */
+class TelemetrySession
+{
+    private static array $instances = [];
+    private static array $instanceTimestamps = [];
+    private const MAX_INSTANCES = 10;
+    /** Session TTL in seconds; sessions older than this are evicted even if 
stream appears alive. */
+    private const SESSION_TTL_SECONDS = 1800; // 30 minutes
+    private object $client;
+    private string $endpoints;
+    /** @var object|null gRPC stream */
+    private $stream;
+    private Logger $logger;
+    private string $clientId;
+
+    // Settings sync state
+    private bool $settingsSynced = false;
+    private ?string $settingsError = null;
+    /**
+     * Maximum number of consecutive errors before giving up.
+     */
+    private const MAX_CONSECUTIVE_ERRORS = 10;
+    /**
+     * Read timeout in seconds.
+     */
+    private const READ_TIMEOUT_SECONDS = 30.0;
+    private float $settingsTimeout = 3.0; // seconds, matching Java's 
SETTINGS_INITIALIZATION_TIMEOUT
+
+    // Credentials for AK/SK signing
+    private ?SessionCredentials $credentials = null;
+
+    // Namespace for resource scoping
+    private string $namespace = '';
+
+    // Settings received from server
+    private ?object $serverSettings = null;
+
+    // Settings change callback
+    /** @var callable|null */
+    private $onSettingsChange = null;
+
+    // Server command callbacks
+    /** @var callable|null */
+    private $onRecoverOrphanedTransaction = null;
+    /** @var callable|null */
+    private $onVerifyMessage = null;
+    /** @var callable|null */
+    private $onPrintThreadStackTrace = null;
+    /** @var callable|null */
+    private $onReconnectEndpoints = null;
+    /** @var callable|null */
+    private $onNotifyUnsubscribeLite = null;
+
+    // Swoole coroutine reader state
+    private int $swooleCoroutineId = -1;
+    private bool $isClosing = false;
+    private bool $isReconnecting = false;
+    private ?object $lastSettingsCommand = null;
+
+    /**
+     * Initialize telemetry session with client and connection details.
+     *
+     * @param object $client gRPC messaging service client
+     * @param string $endpoints Server endpoints
+     * @param string|null $clientId Client identifier
+     * @param SessionCredentials|null $credentials Session credentials for 
signing
+     * @param string $namespace Resource namespace
+     */
+    private function __construct(object $client, string $endpoints, ?string 
$clientId = null, ?SessionCredentials $credentials = null, string $namespace = 
'')
+    {
+        $this->client = $client;
+        $this->endpoints = $endpoints;
+        $this->credentials = $credentials;
+        $this->namespace = $namespace;
+        $this->logger = Logger::getInstance('TelemetrySession');
+        if ($clientId) {
+            $this->clientId = $clientId;
+        }
+    }
+
+    /**
+     * Register callback for server settings changes.
+     *
+     * @param callable $callback Callback receiving server Settings
+     * @return void
+     */
+    public function setOnSettingsChange(callable $callback): void
+    {
+        $this->onSettingsChange = $callback;
+    }
+
+    /**
+     * Register callback for orphaned transaction recovery.
+     *
+     * @param callable $callback Callback receiving 
RecoverOrphanedTransactionCommand
+     * @return void
+     */
+    public function setOnRecoverOrphanedTransaction(callable $callback): void
+    {
+        $this->onRecoverOrphanedTransaction = $callback;
+    }
+
+    /**
+     * Register callback for message verification.
+     *
+     * @param callable $callback Callback receiving VerifyMessageCommand
+     * @return void
+     */
+    public function setOnVerifyMessage(callable $callback): void
+    {
+        $this->onVerifyMessage = $callback;
+    }
+
+    /**
+     * Register callback for printing thread stack trace.
+     *
+     * @param callable $callback Callback receiving 
PrintThreadStackTraceCommand
+     * @return void
+     */
+    public function setOnPrintThreadStackTrace(callable $callback): void
+    {
+        $this->onPrintThreadStackTrace = $callback;
+    }
+
+    /**
+     * Register callback for endpoint reconnection.
+     *
+     * @param callable $callback Callback receiving ReconnectEndpointsCommand
+     * @return void
+     */
+    public function setOnReconnectEndpoints(callable $callback): void
+    {
+        $this->onReconnectEndpoints = $callback;
+    }
+
+    /**
+     * Register callback for unsubscribe notification.
+     *
+     * @param callable $callback Callback receiving 
NotifyUnsubscribeLiteCommand
+     * @return void
+     */
+    public function setOnNotifyUnsubscribeLite(callable $callback): void
+    {
+        $this->onNotifyUnsubscribeLite = $callback;
+    }
+
+    /**
+     * Get the current server settings.
+     *
+     * @return object|null Server settings object or null
+     */
+    public function getServerSettings()
+    {
+        return $this->serverSettings;
+    }
+
+    /**
+     * Reset all session instances (mainly for testing).
+     *
+     * @return void
+     */
+    public static function resetAll(): void
+    {
+        self::$instances = [];
+        self::$instanceTimestamps = [];
+    }
+
+    /**
+     * Get or create a session instance for the given endpoints.
+     *
+     * @param object $client gRPC messaging service client
+     * @param string $endpoints Server endpoints
+     * @param string|null $clientId Client identifier
+     * @param SessionCredentials|null $credentials Session credentials
+     * @param string $namespace Resource namespace
+     * @return self
+     */
+    public static function getInstance(object $client, string $endpoints, 
?string $clientId = null, ?SessionCredentials $credentials = null, string 
$namespace = ''): self
+    {
+        $credId = $credentials !== null ? spl_object_id($credentials) : 'none';
+        $effectiveClientId = $clientId ?? 'none';
+        $key = $endpoints . '|' . $credId . '|' . $namespace . '|' . 
$effectiveClientId;
+
+        if (isset(self::$instances[$key])) {
+            $existing = self::$instances[$key];
+            $age = time() - (self::$instanceTimestamps[$key] ?? 0);
+
+            if (!$existing->isAlive() || $age > self::SESSION_TTL_SECONDS) {
+                $reason = !$existing->isAlive() ? 'dead stream' : "TTL expired 
({$age}s > " . self::SESSION_TTL_SECONDS . "s)";
+                Logger::getInstance('TelemetrySession')->info("Evicting stale 
session for endpoints: {$endpoints}, reason: {$reason}");
+                $existing->close();
+                unset(self::$instances[$key]);
+                unset(self::$instanceTimestamps[$key]);
+            }
+        }
+
+        if (!isset(self::$instances[$key])) {
+            if (count(self::$instances) >= self::MAX_INSTANCES) {
+                self::evictOldest();
+            }
+            Logger::getInstance('TelemetrySession')->info("Creating new 
session for endpoints: {$endpoints}, clientId: {$effectiveClientId}");
+            $instance = new self($client, $endpoints, $clientId, $credentials, 
$namespace);
+            self::$instances[$key] = $instance;
+            self::$instanceTimestamps[$key] = time();
+        }
+
+        return self::$instances[$key];
+    }
+
+    /**
+     * Check if this session is still alive.
+     *
+     * Health check hierarchy:
+     *   1. isClosing flag → immediately stale
+     *   2. Stream was created but is now closed → stale
+     *   3. Stream exists but write probe fails → stale
+     *   4. Session never started (no stream yet) → considered alive (pending 
start)
+     *
+     * @return bool
+     */
+    private function isAlive(): bool
+    {
+        if ($this->isClosing) {
+            return false;
+        }
+
+        if ($this->stream !== null) {
+            if ($this->isStreamClosed()) {
+                return false;
+            }
+            // Probe write capability: if write fails, the stream is stale
+            if (!$this->probeStreamWritable()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Non-destructive probe to check if the stream is still writable.
+     * Sends a zero-length write which gRPC treats as a keepalive check.
+     *
+     * @return bool true if stream accepts writes
+     */
+    private function probeStreamWritable(): bool
+    {
+        if ($this->stream === null) {
+            return false;
+        }
+        try {
+            // Attempt flush as a lightweight connectivity probe.
+            // gRPC flush will throw if the underlying channel is broken.
+            $this->stream->flush();
+            return true;
+        } catch (\Throwable $e) {
+            $this->logger->debug("Stream write probe failed: " . 
$e->getMessage());
+            return false;
+        }
+    }
+
+    /**
+     * Check if the underlying stream is closed.
+     *
+     * @return bool
+     */
+    private function isStreamClosed(): bool
+    {
+        if ($this->stream === null) {
+            return true;
+        }
+        try {
+            $status = $this->stream->getStatus();
+            $code = is_object($status) ? ($status->code ?? -1) : 
(is_array($status) ? ($status['code'] ?? -1) : -1);
+            if ($code !== 0) {
+                return true;
+            }
+        } catch (\Exception $e) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Evict the oldest instance to make room for a new one.
+     * @return void
+     */
+    private static function evictOldest(): void
+    {
+        $oldestKey = null;
+        $oldestTime = PHP_INT_MAX;
+        foreach (self::$instanceTimestamps as $key => $timestamp) {
+            if ($timestamp < $oldestTime) {
+                $oldestTime = $timestamp;
+                $oldestKey = $key;
+            }
+        }
+
+        if ($oldestKey !== null) {
+            Logger::getInstance('TelemetrySession')->info("Evicting oldest 
session (max instance reached): {$oldestKey}");
+            if (isset(self::$instances[$oldestKey])) {
+                self::$instances[$oldestKey]->close();
+            }
+            unset(self::$instances[$oldestKey]);
+            unset(self::$instanceTimestamps[$oldestKey]);
+        }
+    }
+
+    /**
+     * Synchronize settings with broker via telemetry stream.
+     *
+     * @param object $settingsCommand Telemetry command containing settings
+     * @return bool True if settings were successfully synced
+     */
+    public function syncSettings($settingsCommand)
+    {
+        $this->lastSettingsCommand = $settingsCommand;
+        $this->isClosing = false;
+        
+        // Create stream and send settings
+        $success = $this->createStreamAndSync($settingsCommand);
+        if (!$success) {
+            return false;
+        }
+        
+        // Wait for settings confirmation with timeout
+        return $this->waitForSettingsConfirmation();
+    }
+
+    /**
+     * Wait for settings confirmation from broker with timeout.
+     * In Swoole mode, the background reader will set settingsSynced when 
SETTINGS is received.
+     * In non-Swoole mode, we poll manually with exponential backoff.
+     *
+     * @return bool True if settings confirmed before timeout
+     */
+    private function waitForSettingsConfirmation(): bool
+    {
+        $startTime = microtime(true);
+        $pollIntervalUs = 10000;
+        $maxPollIntervalUs = 200000;
+        
+        while (microtime(true) - $startTime < $this->settingsTimeout) {
+            if ($this->settingsSynced) {
+                $elapsed = round(microtime(true) - $startTime, 2);
+                $this->logger->info("Settings confirmed by broker after 
{$elapsed}s");
+                return true;
+            }
+            
+            if ($this->settingsError !== null) {
+                $this->logger->warning("Settings sync issue (non-fatal): " . 
$this->settingsError);
+                return true;

Review Comment:
   Fixed



##########
.github/workflows/php_build.yml:
##########
@@ -17,9 +17,20 @@ jobs:
         uses: shivammathur/setup-php@v2
         with:
           php-version: ${{ matrix.php-version }}
+          extensions: grpc, protobuf
       - name: Validate composer.json
         working-directory: ./php
         run: composer validate
       - name: Install Dependencies
         working-directory: ./php
-        run: composer install
+        run: composer install --no-interaction --prefer-dist
+      - name: Run PHPUnit Tests
+        if: runner.os != 'Windows'
+        working-directory: ./php
+        run: vendor/bin/phpunit --testsuite "RocketMQ PHP Test Suite" 
--no-coverage
+      - name: Run PHPUnit Tests (Windows)
+        if: runner.os == 'Windows'
+        working-directory: ./php
+        # Workaround: gRPC C extension may cause non-zero exit during PHP 
shutdown on
+        # Windows even when all tests pass. Force exit 0 to prevent false CI 
failures.
+        run: vendor/bin/phpunit --testsuite "RocketMQ PHP Test Suite" 
--no-coverage; exit 0

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to