This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit f3472e03936ab782806234a942d0f1a2d21e9c8d
Author: Lei Zhang <coolbee...@gmail.com>
AuthorDate: Sat Sep 21 00:51:38 2019 +0800

    SCB-1368 Kafka at-least-once delivery
---
 .../fsm/channel/kafka/KafkaSagaEventConsumer.java  | 41 ++++++++++++----------
 1 file changed, 23 insertions(+), 18 deletions(-)

diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
index b816302..7790c12 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
@@ -19,14 +19,13 @@ package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.kafka.CommitterSettings;
+import akka.kafka.ConsumerMessage;
 import akka.kafka.ConsumerSettings;
 import akka.kafka.Subscriptions;
-import akka.kafka.javadsl.Committer;
 import akka.kafka.javadsl.Consumer;
 import akka.stream.ActorMaterializer;
 import akka.stream.Materializer;
-import akka.stream.javadsl.Keep;
+import akka.stream.javadsl.Sink;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import java.lang.invoke.MethodHandles;
@@ -51,6 +50,7 @@ public class KafkaSagaEventConsumer extends 
AbstractEventConsumer {
       MetricsService metricsService, String bootstrap_servers, String topic) {
     super(actorSystem, sagaShardRegionActor, metricsService);
 
+
     // init consumer
     final Materializer materializer = ActorMaterializer.create(actorSystem);
     final Config consumerConfig = 
actorSystem.settings().config().getConfig("akka.kafka.consumer");
@@ -60,37 +60,42 @@ public class KafkaSagaEventConsumer extends 
AbstractEventConsumer {
             .withBootstrapServers(bootstrap_servers)
             .withGroupId(groupId)
             .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-            .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 
"5000")
             .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
             .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"StringDeserializer.class")
-            .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                "StringDeserializer.class");
-    CommitterSettings committerSettings = 
CommitterSettings.create(consumerConfig);
+            .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"StringDeserializer.class");
     Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
-        .mapAsync(1, event -> { // must be set to 1 for ordered
-          return sendSagaActor(event.record().key(), event.record().value())
-              .thenApply(done -> event.committableOffset());
+        .mapAsync(10, event -> {
+          BaseEvent bean = jsonMapper.readValue(event.record().value(), 
BaseEvent.class);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("kafka receive {} {}", bean.getGlobalTxId(), 
bean.getType());
+          }
+          return sendSagaActor(bean).thenApply(done -> 
event.committableOffset());
         })
-        .toMat(Committer.sink(committerSettings), Keep.both())
-        .mapMaterializedValue(Consumer::createDrainingControl)
+        .batch(
+            100,
+            ConsumerMessage::createCommittableOffsetBatch,
+            ConsumerMessage.CommittableOffsetBatch::updated
+        )
+        .mapAsync(10, offset -> offset.commitJavadsl())
+        .to(Sink.ignore())
         .run(materializer);
   }
 
-  private CompletionStage<String> sendSagaActor(String key, String value) {
+  private CompletionStage<String> sendSagaActor(BaseEvent event) {
     try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("key {}, value {}", key, value);
-      }
       long begin = System.currentTimeMillis();
       metricsService.metrics().doActorReceived();
-      sagaShardRegionActor.tell(jsonMapper.readValue(value, BaseEvent.class), 
sagaShardRegionActor);
+      sagaShardRegionActor.tell(event, sagaShardRegionActor);
       long end = System.currentTimeMillis();
       metricsService.metrics().doActorAccepted();
       metricsService.metrics().doActorAvgTime(end - begin);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("send saga actor {} {}", event, event.getType());
+      }
       return CompletableFuture.completedFuture("");
     } catch (Exception ex) {
+      LOG.error(ex.getMessage(),ex);
       metricsService.metrics().doActorRejected();
-      LOG.error("key {}, value {}", key, value);
       throw new CompletionException(ex);
     }
   }

Reply via email to