This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit f3472e03936ab782806234a942d0f1a2d21e9c8d Author: Lei Zhang <coolbee...@gmail.com> AuthorDate: Sat Sep 21 00:51:38 2019 +0800 SCB-1368 Kafka at-least-once delivery --- .../fsm/channel/kafka/KafkaSagaEventConsumer.java | 41 ++++++++++++---------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java index b816302..7790c12 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java @@ -19,14 +19,13 @@ package org.apache.servicecomb.pack.alpha.fsm.channel.kafka; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.kafka.CommitterSettings; +import akka.kafka.ConsumerMessage; import akka.kafka.ConsumerSettings; import akka.kafka.Subscriptions; -import akka.kafka.javadsl.Committer; import akka.kafka.javadsl.Consumer; import akka.stream.ActorMaterializer; import akka.stream.Materializer; -import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import java.lang.invoke.MethodHandles; @@ -51,6 +50,7 @@ public class KafkaSagaEventConsumer extends AbstractEventConsumer { MetricsService metricsService, String bootstrap_servers, String topic) { super(actorSystem, sagaShardRegionActor, metricsService); + // init consumer final Materializer materializer = ActorMaterializer.create(actorSystem); final Config consumerConfig = actorSystem.settings().config().getConfig("akka.kafka.consumer"); @@ -60,37 +60,42 @@ public class KafkaSagaEventConsumer extends AbstractEventConsumer { .withBootstrapServers(bootstrap_servers) .withGroupId(groupId) .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class") - .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - "StringDeserializer.class"); - CommitterSettings committerSettings = CommitterSettings.create(consumerConfig); + .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class"); Consumer.committableSource(consumerSettings, Subscriptions.topics(topic)) - .mapAsync(1, event -> { // must be set to 1 for ordered - return sendSagaActor(event.record().key(), event.record().value()) - .thenApply(done -> event.committableOffset()); + .mapAsync(10, event -> { + BaseEvent bean = jsonMapper.readValue(event.record().value(), BaseEvent.class); + if (LOG.isDebugEnabled()) { + LOG.debug("kafka receive {} {}", bean.getGlobalTxId(), bean.getType()); + } + return sendSagaActor(bean).thenApply(done -> event.committableOffset()); }) - .toMat(Committer.sink(committerSettings), Keep.both()) - .mapMaterializedValue(Consumer::createDrainingControl) + .batch( + 100, + ConsumerMessage::createCommittableOffsetBatch, + ConsumerMessage.CommittableOffsetBatch::updated + ) + .mapAsync(10, offset -> offset.commitJavadsl()) + .to(Sink.ignore()) .run(materializer); } - private CompletionStage<String> sendSagaActor(String key, String value) { + private CompletionStage<String> sendSagaActor(BaseEvent event) { try { - if (LOG.isDebugEnabled()) { - LOG.debug("key {}, value {}", key, value); - } long begin = System.currentTimeMillis(); metricsService.metrics().doActorReceived(); - sagaShardRegionActor.tell(jsonMapper.readValue(value, BaseEvent.class), sagaShardRegionActor); + sagaShardRegionActor.tell(event, sagaShardRegionActor); long end = System.currentTimeMillis(); metricsService.metrics().doActorAccepted(); metricsService.metrics().doActorAvgTime(end - begin); + if (LOG.isDebugEnabled()) { + LOG.debug("send saga actor {} {}", event, event.getType()); + } return CompletableFuture.completedFuture(""); } catch (Exception ex) { + LOG.error(ex.getMessage(),ex); metricsService.metrics().doActorRejected(); - LOG.error("key {}, value {}", key, value); throw new CompletionException(ex); } }