This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 03d86caf6c0dd728234da6c413f4683445013855 Author: Lei Zhang <coolbee...@gmail.com> AuthorDate: Tue Sep 10 17:21:49 2019 +0800 SCB-1368 Refactoring memory channel --- .../memory/MemoryChannelAutoConfiguration.java | 62 +++++++++++++++++++ .../channel/memory/MemorySagaEventConsumer.java | 69 ++++++++++++++++++++++ 2 files changed, 131 insertions(+) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryChannelAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryChannelAutoConfiguration.java new file mode 100644 index 0000000..7ad7878 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryChannelAutoConfiguration.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.pack.alpha.fsm.channel.memory; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import java.lang.invoke.MethodHandles; +import javax.annotation.PostConstruct; +import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true) +public class MemoryChannelAutoConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Value("${alpha.feature.akka.channel.memory.size:-1}") + int memoryEventChannelMemorySize; + + @PostConstruct + public void init(){ + LOG.info("Memory Channel Init"); + } + + @Bean(name = "memoryEventChannel") + @ConditionalOnMissingBean(ActorEventChannel.class) + public ActorEventChannel memoryEventChannel(MetricsService metricsService) { + return new MemoryActorEventChannel(metricsService, memoryEventChannelMemorySize); + } + + @Bean + MemorySagaEventConsumer sagaEventMemoryConsumer(ActorSystem actorSystem, + @Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor, + MetricsService metricsService, + @Qualifier("memoryEventChannel") ActorEventChannel actorEventChannel) { + return new MemorySagaEventConsumer(actorSystem, sagaShardRegionActor, metricsService, + (MemoryActorEventChannel) actorEventChannel); + } +} \ No newline at end of file diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java new file mode 100644 index 0000000..f2af56b --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.channel.memory; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import java.lang.invoke.MethodHandles; +import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MemorySagaEventConsumer extends AbstractEventConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + final MemoryActorEventChannel channel; + + public MemorySagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor, MetricsService metricsService, + MemoryActorEventChannel channel) { + super(actorSystem, sagaShardRegionActor, metricsService); + this.channel = channel; + new Thread(new MemorySagaEventConsumer.EventConsumer(), "MemorySagaEventConsumer").start(); + } + + class EventConsumer implements Runnable { + + @Override + public void run() { + while (true) { + try { + BaseEvent event = channel.getEventQueue().peek(); + if (event != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("event {}", event); + } + long begin = System.currentTimeMillis(); + metricsService.metrics().doActorReceived(); + sagaShardRegionActor.tell(event, sagaShardRegionActor); + long end = System.currentTimeMillis(); + metricsService.metrics().doActorAccepted(); + metricsService.metrics().doActorAvgTime(end - begin); + channel.getEventQueue().poll(); + } else { + Thread.sleep(10); + } + } catch (Exception ex) { + metricsService.metrics().doActorRejected(); + LOG.error(ex.getMessage(), ex); + } + } + } + } +}