glcrazier commented on code in PR #767:
URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1685358156


##########
rust/src/push_consumer.rs:
##########
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use mockall::automock;
+use mockall_double::double;
+use parking_lot::{Mutex, RwLock};
+use prost_types::Duration;
+use slog::Logger;
+use slog::{debug, error, info, warn};
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use tokio::select;
+use tokio::sync::{mpsc, oneshot};
+
+#[double]
+use crate::client::Client;
+use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{ClientType, ConsumeResult, FilterExpression, 
MessageQueue};
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::pb::receive_message_response::Content;
+use crate::pb::{
+    AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, 
QueryAssignmentRequest,
+    ReceiveMessageRequest, Resource,
+};
+use crate::session::{RPCClient, Session};
+use crate::util::{build_endpoints_by_message_queue, 
build_push_consumer_settings};
+use crate::{log, pb};
+
+const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new";
+const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message";
+const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message";
+const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str = 
"push_consumer.change_invisible_duration";
+
+pub type MessageListener = Box<dyn Fn(&MessageView) -> ConsumeResult + Send + 
Sync>;
+
+pub struct PushConsumer {
+    logger: Logger,
+    client: Client,
+    message_listener: Arc<MessageListener>,
+    option: Arc<RwLock<PushConsumerOption>>,
+    shutdown_tx: Option<oneshot::Sender<()>>,
+}
+
+/*
+ * An actor is required for each message queue.
+ * It is responsible for polling messages from the message queue. It 
communicates with PushConsumer by channels.
+ */
+struct MessageQueueActor {
+    logger: Logger,
+    rpc_client: Session,
+    message_queue: MessageQueue,
+    shutdown_tx: Option<oneshot::Sender<()>>,
+    option: PushConsumerOption,
+    message_listener: Arc<MessageListener>,
+    retry_policy: BackOffRetryPolicy,
+}
+
+impl PushConsumer {
+    pub fn new(
+        client_option: ClientOption,
+        option: PushConsumerOption,
+        message_listener: MessageListener,
+    ) -> Result<Self, ClientError> {
+        if option.consumer_group().is_empty() {
+            return Err(ClientError::new(
+                ErrorKind::Config,
+                "consumer group is required.",
+                OPERATION_NEW_PUSH_CONSUMER,
+            ));
+        }
+        if option.subscription_expressions().is_empty() {
+            return Err(ClientError::new(
+                ErrorKind::Config,
+                "subscription expressions is required.",
+                OPERATION_NEW_PUSH_CONSUMER,
+            ));
+        }
+        let client_option = ClientOption {
+            client_type: ClientType::PushConsumer,
+            group: Some(option.consumer_group().to_string()),
+            ..client_option
+        };
+        let logger = log::logger(option.logging_format());
+        let client = Client::new(
+            &logger,
+            client_option,
+            build_push_consumer_settings(&option),
+        )?;
+        Ok(Self {
+            logger,
+            client,
+            message_listener: Arc::new(message_listener),
+            option: Arc::new(RwLock::new(option)),
+            shutdown_tx: None,
+        })
+    }
+
+    pub async fn start(&mut self) -> Result<(), ClientError> {
+        let (telemetry_command_tx, mut telemetry_command_rx) = 
mpsc::channel(16);
+        self.client.start(telemetry_command_tx).await?;
+        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+        self.shutdown_tx = Some(shutdown_tx);
+        let option = Arc::clone(&self.option);
+        let mut rpc_client = self.client.get_session().await?;
+        let route_manager = self.client.get_route_manager();
+        let topics;
+        {
+            topics = option
+                .read()
+                .subscription_expressions()
+                .keys()
+                .cloned()
+                .collect();
+        }
+        route_manager
+            .sync_topic_routes(&mut rpc_client, topics)
+            .await?;
+        let logger = self.logger.clone();
+        let mut actor_table: HashMap<MessageQueue, MessageQueueActor> = 
HashMap::new();
+
+        let message_listener = Arc::clone(&self.message_listener);
+        // must retrieve settings from server first.
+        let command = telemetry_command_rx.recv().await.ok_or(ClientError::new(
+            ErrorKind::Connect,
+            "telemetry command channel closed.",
+            OPERATION_START_PUSH_CONSUMER,
+        ))?;
+        let backoff_policy = 
Arc::new(Mutex::new(BackOffRetryPolicy::try_from(command).map_err(
+            |_| {
+                ClientError::new(
+                    ErrorKind::Connect,
+                    "backoff policy is required.",
+                    OPERATION_START_PUSH_CONSUMER,
+                )
+            },
+        )?));
+
+        tokio::spawn(async move {
+            let mut scan_assignments_timer =
+                tokio::time::interval(std::time::Duration::from_secs(30));
+            loop {
+                select! {
+                    command = telemetry_command_rx.recv() => {
+                        if let Some(command) = command {
+                            let remote_backoff_policy = 
BackOffRetryPolicy::try_from(command);
+                            if let Ok(remote_backoff_policy) = 
remote_backoff_policy {
+                                let mut guard = backoff_policy.lock();
+                                *guard = remote_backoff_policy;
+                            }
+                        }
+                    }
+                    _ = scan_assignments_timer.tick() => {
+                        let consumer_option;
+                        {
+                            consumer_option = option.read().clone();
+                        }
+                        let subscription_table = 
consumer_option.subscription_expressions();
+                        // query endpoints from topic route
+                        let retry_policy = Arc::clone(&backoff_policy);
+                        let mut client = rpc_client.shadow_session();
+                        for topic in subscription_table.keys() {
+                            if let Some(endpoints) = 
route_manager.pick_endpoints(topic.as_str()) {
+                                let request = QueryAssignmentRequest {
+                                    topic: Some(Resource {
+                                        name: topic.to_string(),
+                                        resource_namespace: 
consumer_option.namespace().to_string(),
+                                    }),
+                                    group: Some(Resource {
+                                        name: 
consumer_option.consumer_group().to_string(),
+                                        resource_namespace: 
consumer_option.namespace().to_string(),
+                                    }),
+                                    endpoints: Some(endpoints.into_inner()),
+                                };
+                                let retry_policy_inner;
+                                {
+                                    retry_policy_inner = 
retry_policy.lock().clone();
+                                }
+                                let result = 
client.query_assignment(request).await;
+                                if let Ok(response) = result {
+                                    if 
Client::handle_response_status(response.status, 
OPERATION_START_PUSH_CONSUMER).is_ok() {
+                                            let _ = 
Self::process_assignments(logger.clone(),
+                                                rpc_client.shadow_session(),
+                                                &consumer_option,
+                                                Arc::clone(&message_listener),
+                                                &mut actor_table,
+                                                response.assignments,
+                                                retry_policy_inner,
+                                            ).await;
+                                    } else {
+                                        error!(logger, "query assignment 
failed, no status in response.");
+                                    }
+                                } else {
+                                    error!(logger, "query assignment failed: 
{:?}", result.unwrap_err());
+                                }
+                            }
+                        }
+                    }
+                    _ = &mut shutdown_rx => {
+                        let entries = actor_table.drain();
+                        info!(logger, "shutdown {:?} actors", entries.len());
+                        for (_, actor) in entries {
+                            let _ = actor.shutdown().await;
+                        }
+                        break;
+                    }
+                }
+            }
+        });
+        Ok(())
+    }
+
+    async fn process_assignments(
+        logger: Logger,
+        rpc_client: Session,
+        option: &PushConsumerOption,
+        message_listener: Arc<MessageListener>,
+        actor_table: &mut HashMap<MessageQueue, MessageQueueActor>,
+        mut assignments: Vec<Assignment>,
+        retry_policy: BackOffRetryPolicy,
+    ) -> Result<(), ClientError> {
+        let message_queues: Vec<MessageQueue> = assignments
+            .iter_mut()
+            .filter_map(|assignment| {
+                if let Some(message_queue) = assignment.message_queue.take() {
+                    return 
MessageQueue::from_pb_message_queue(message_queue).ok();
+                }
+                None
+            })
+            .collect();
+        // remove existing actors from map, the remaining ones will be 
shutdown.
+        let mut actors: Vec<MessageQueueActor> = 
Vec::with_capacity(actor_table.len());
+        message_queues.iter().for_each(|message_queue| {
+            let entry = actor_table.remove(message_queue);
+            if let Some(actor) = entry {
+                actors.push(actor);
+            }
+        });
+        // the remaining actors will be shutdown.
+        let shutdown_entries = actor_table.drain();
+        for (_, actor) in shutdown_entries {
+            let _ = actor.shutdown().await;
+        }
+        let mut max_cache_messages_per_queue = 
option.max_cache_message_count();
+        if !message_queues.is_empty() {
+            max_cache_messages_per_queue = option.max_cache_message_count() / 
message_queues.len();
+        }
+        for mut actor in actors {
+            let mut option = option.clone();
+            option.set_max_cache_message_count(max_cache_messages_per_queue);
+            actor.set_option(option);
+            actor.set_retry_policy(retry_policy.clone());
+            actor_table.insert(actor.message_queue.clone(), actor);
+        }
+
+        // start new actors
+        for message_queue in message_queues {
+            if actor_table.contains_key(&message_queue) {
+                continue;
+            }
+            let mut option = option.clone();
+            option.set_max_cache_message_count(max_cache_messages_per_queue);
+            let mut actor = MessageQueueActor::new(
+                logger.clone(),
+                rpc_client.shadow_session(),
+                message_queue.clone(),
+                option,
+                Arc::clone(&message_listener),
+                retry_policy.clone(),
+            );
+            let result = actor.start().await;
+            if result.is_ok() {
+                actor_table.insert(message_queue, actor);
+            }
+        }
+        Ok(())
+    }
+
+    pub async fn shutdown(mut self) -> Result<(), ClientError> {
+        if let Some(shutdown_tx) = self.shutdown_tx.take() {
+            let _ = shutdown_tx.send(());
+        }
+        self.client.shutdown().await?;
+        Ok(())
+    }
+}
+
+struct MessageHandler {
+    message: MessageView,
+    status: ConsumeResult,
+    attempt: usize,
+}
+
+#[automock]
+impl MessageQueueActor {
+    pub(crate) fn new(
+        logger: Logger,
+        rpc_client: Session,
+        message_queue: MessageQueue,
+        option: PushConsumerOption,
+        message_listener: Arc<MessageListener>,
+        retry_policy: BackOffRetryPolicy,
+    ) -> Self {
+        Self {
+            logger,
+            rpc_client,
+            message_queue,
+            shutdown_tx: None,
+            option,
+            message_listener: Arc::clone(&message_listener),
+            retry_policy,
+        }
+    }
+
+    // The following two methods won't affect the running actor in fact.
+    pub(crate) fn set_option(&mut self, option: PushConsumerOption) {
+        self.option = option;
+    }
+
+    pub(crate) fn set_retry_policy(&mut self, retry_policy: 
BackOffRetryPolicy) {
+        self.retry_policy = retry_policy;
+    }
+
+    fn fork_self(&self) -> Self {
+        Self {
+            logger: self.logger.clone(),
+            rpc_client: self.rpc_client.shadow_session(),
+            message_queue: self.message_queue.clone(),
+            shutdown_tx: None,
+            option: self.option.clone(),
+            message_listener: Arc::clone(&self.message_listener),
+            retry_policy: self.retry_policy.clone(),
+        }
+    }
+
+    pub(crate) fn get_consumer_group(&self) -> Resource {
+        Resource {
+            name: self.option.consumer_group().to_string(),
+            resource_namespace: self.option.namespace().to_string(),
+        }
+    }
+
+    #[allow(clippy::needless_lifetimes)]
+    pub(crate) fn get_filter_expression<'a>(&'a self) -> Option<&'a 
FilterExpression> {
+        self.option
+            .subscription_expressions()
+            .get(self.message_queue.topic.name.as_str())
+    }
+
+    pub(crate) async fn start(&mut self) -> Result<(), ClientError> {
+        debug!(
+            self.logger,
+            "start a new queue actor {:?}", self.message_queue
+        );
+        let (tx, mut shutdown_rx) = oneshot::channel();
+        self.shutdown_tx = Some(tx);
+        let (tx, mut poll_rx) = mpsc::channel(8);
+        let poll_tx: mpsc::Sender<()> = tx;
+        let logger = self.logger.clone();
+        let mut actor = self.fork_self();
+        let mut message_handler_queue: VecDeque<MessageHandler> = 
VecDeque::new();
+        let (tx, mut ack_rx) = mpsc::channel(8);
+        let ack_tx: mpsc::Sender<()> = tx;
+        let mut queue_check_ticker = 
tokio::time::interval(std::time::Duration::from_secs(1));
+        let mut rpc_client = actor.rpc_client.shadow_session();
+        tokio::spawn(async move {
+            loop {
+                select! {
+                    _ = poll_rx.recv() => {
+                        actor.receive_messages(&mut rpc_client, &mut 
message_handler_queue, poll_tx.clone()).await;
+                    }
+                    _ = ack_rx.recv() => {
+                        actor.ack_message_in_waiting_queue(&mut rpc_client, 
&mut message_handler_queue, ack_tx.clone()).await;
+                    }
+                    _ = queue_check_ticker.tick() => {
+                        if !message_handler_queue.is_empty() {
+                            let _ = ack_tx.try_send(());
+                        }
+                        if message_handler_queue.len() < 
actor.option.max_cache_message_count() {
+                            let _ = poll_tx.try_send(());
+                        }
+                    }
+                    _ = &mut shutdown_rx => {
+                        info!(logger, "message queue actor shutdown");
+                        actor.flush_waiting_queue(&mut rpc_client, &mut 
message_handler_queue).await;
+                        break;
+                    }
+                }
+            }
+        });
+        Ok(())
+    }
+
+    async fn receive_messages<T: RPCClient + 'static>(
+        &mut self,
+        rpc_client: &mut T,
+        message_handler_queue: &mut VecDeque<MessageHandler>,
+        poll_tx: mpsc::Sender<()>,
+    ) {
+        let poll_result = self.poll_messages(rpc_client).await;
+        if let Ok(messages) = poll_result {
+            for message in messages {
+                let consume_result = (self.message_listener)(&message);
+                match consume_result {
+                    ConsumeResult::SUCCESS => {
+                        let result = self.ack_message(rpc_client, 
&message).await;
+                        if result.is_err() {
+                            info!(

Review Comment:
   You idea is great, and I refactored that the consumption of standard and 
fifo is capsulated in separate implementation. And I also extract the 
acknowledgement work into a single implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to