This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 03d86caf6c0dd728234da6c413f4683445013855
Author: Lei Zhang <coolbee...@gmail.com>
AuthorDate: Tue Sep 10 17:21:49 2019 +0800

    SCB-1368 Refactoring memory channel
---
 .../memory/MemoryChannelAutoConfiguration.java     | 62 +++++++++++++++++++
 .../channel/memory/MemorySagaEventConsumer.java    | 69 ++++++++++++++++++++++
 2 files changed, 131 insertions(+)

diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryChannelAutoConfiguration.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryChannelAutoConfiguration.java
new file mode 100644
index 0000000..7ad7878
--- /dev/null
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryChannelAutoConfiguration.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.pack.alpha.fsm.channel.memory;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import java.lang.invoke.MethodHandles;
+import javax.annotation.PostConstruct;
+import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import 
org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue 
= "memory", matchIfMissing = true)
+public class MemoryChannelAutoConfiguration {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Value("${alpha.feature.akka.channel.memory.size:-1}")
+  int memoryEventChannelMemorySize;
+
+  @PostConstruct
+  public void init(){
+    LOG.info("Memory Channel Init");
+  }
+
+  @Bean(name = "memoryEventChannel")
+  @ConditionalOnMissingBean(ActorEventChannel.class)
+  public ActorEventChannel memoryEventChannel(MetricsService metricsService) {
+    return new MemoryActorEventChannel(metricsService, 
memoryEventChannelMemorySize);
+  }
+
+  @Bean
+  MemorySagaEventConsumer sagaEventMemoryConsumer(ActorSystem actorSystem,
+      @Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor,
+      MetricsService metricsService,
+      @Qualifier("memoryEventChannel") ActorEventChannel actorEventChannel) {
+    return new MemorySagaEventConsumer(actorSystem, sagaShardRegionActor, 
metricsService,
+        (MemoryActorEventChannel) actorEventChannel);
+  }
+}
\ No newline at end of file
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java
new file mode 100644
index 0000000..f2af56b
--- /dev/null
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel.memory;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemorySagaEventConsumer extends AbstractEventConsumer {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  final MemoryActorEventChannel channel;
+
+  public MemorySagaEventConsumer(ActorSystem actorSystem, ActorRef 
sagaShardRegionActor, MetricsService metricsService,
+      MemoryActorEventChannel channel) {
+    super(actorSystem, sagaShardRegionActor, metricsService);
+    this.channel = channel;
+    new Thread(new MemorySagaEventConsumer.EventConsumer(), 
"MemorySagaEventConsumer").start();
+  }
+
+  class EventConsumer implements Runnable {
+
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          BaseEvent event = channel.getEventQueue().peek();
+          if (event != null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("event {}", event);
+            }
+            long begin = System.currentTimeMillis();
+            metricsService.metrics().doActorReceived();
+            sagaShardRegionActor.tell(event, sagaShardRegionActor);
+            long end = System.currentTimeMillis();
+            metricsService.metrics().doActorAccepted();
+            metricsService.metrics().doActorAvgTime(end - begin);
+            channel.getEventQueue().poll();
+          } else {
+            Thread.sleep(10);
+          }
+        } catch (Exception ex) {
+          metricsService.metrics().doActorRejected();
+          LOG.error(ex.getMessage(), ex);
+        }
+      }
+    }
+  }
+}

Reply via email to