This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 6779ea15fa757ba10535391e01dac412f1dbe518 Author: Lei Zhang <coolbee...@gmail.com> AuthorDate: Sat Sep 21 01:06:05 2019 +0800 SCB-1368 Delete the Actor state persistent data after transaction data is saved successfully --- .../servicecomb/pack/alpha/fsm/SagaActor.java | 278 +++++++++++++-------- .../src/main/resources/application.yaml | 19 +- 2 files changed, 180 insertions(+), 117 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index ba4b0ff..4bd536e 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -17,7 +17,9 @@ package org.apache.servicecomb.pack.alpha.fsm; +import akka.actor.PoisonPill; import akka.actor.Props; +import akka.cluster.sharding.ShardRegion; import akka.persistence.fsm.AbstractPersistentFSM; import java.lang.invoke.MethodHandles; import java.util.Arrays; @@ -27,6 +29,7 @@ import java.util.concurrent.TimeUnit; import org.apache.servicecomb.pack.alpha.core.AlphaException; import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType; import org.apache.servicecomb.pack.alpha.core.fsm.TxState; +import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain; import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent; import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain; @@ -72,6 +75,7 @@ public class SagaActor extends when(SagaActorState.IDLE, matchEvent(SagaStartedEvent.class, (event, data) -> { + log(event); sagaBeginTime = System.currentTimeMillis(); SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter(); SagaStartedDomain domainEvent = new SagaStartedDomain(event); @@ -92,6 +96,7 @@ public class SagaActor extends when(SagaActorState.READY, matchEvent(TxStartedEvent.class, SagaData.class, (event, data) -> { + log(event); AddTxEventDomain domainEvent = new AddTxEventDomain(event); if (data.getExpirationTime() != null) { return goTo(SagaActorState.PARTIALLY_ACTIVE) @@ -104,12 +109,14 @@ public class SagaActor extends } ).event(SagaEndedEvent.class, (event, data) -> { + log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent); } ).event(SagaAbortedEvent.class, (event, data) -> { + log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent); @@ -125,6 +132,7 @@ public class SagaActor extends when(SagaActorState.PARTIALLY_ACTIVE, matchEvent(TxEndedEvent.class, SagaData.class, (event, data) -> { + log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); if (data.getExpirationTime() != null) { return goTo(SagaActorState.PARTIALLY_COMMITTED) @@ -137,6 +145,7 @@ public class SagaActor extends } ).event(TxStartedEvent.class, (event, data) -> { + log(event); AddTxEventDomain domainEvent = new AddTxEventDomain(event); if (data.getExpirationTime() != null) { return stay() @@ -148,6 +157,7 @@ public class SagaActor extends } ).event(SagaTimeoutEvent.class, (event, data) -> { + log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT); return goTo(SagaActorState.SUSPENDED) @@ -155,6 +165,7 @@ public class SagaActor extends } ).event(TxAbortedEvent.class, (event, data) -> { + log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return goTo(SagaActorState.FAILED) .applying(domainEvent); @@ -169,6 +180,7 @@ public class SagaActor extends when(SagaActorState.PARTIALLY_COMMITTED, matchEvent(TxStartedEvent.class, (event, data) -> { + log(event); AddTxEventDomain domainEvent = new AddTxEventDomain(event); if (data.getExpirationTime() != null) { return goTo(SagaActorState.PARTIALLY_ACTIVE) @@ -181,6 +193,7 @@ public class SagaActor extends } ).event(TxEndedEvent.class, (event, data) -> { + log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); if (data.getExpirationTime() != null) { return stay() @@ -192,23 +205,27 @@ public class SagaActor extends } ).event(SagaTimeoutEvent.class, (event, data) -> { + log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent); } ).event(SagaEndedEvent.class, (event, data) -> { + log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED); return goTo(SagaActorState.COMMITTED) .applying(domainEvent); } ).event(SagaAbortedEvent.class, (event, data) -> { + log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED); return goTo(SagaActorState.FAILED).applying(domainEvent); } ).event(TxAbortedEvent.class, (event, data) -> { + log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return goTo(SagaActorState.FAILED).applying(domainEvent); } @@ -222,12 +239,14 @@ public class SagaActor extends when(SagaActorState.FAILED, matchEvent(SagaTimeoutEvent.class, SagaData.class, (event, data) -> { + log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent); } ).event(TxCompensatedEvent.class, SagaData.class, (event, data) -> { + log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return stay().applying(domainEvent).andThen(exec(_data -> { self().tell(ComponsitedCheckEvent.builder().build(), self()); @@ -235,6 +254,7 @@ public class SagaActor extends } ).event(ComponsitedCheckEvent.class, SagaData.class, (event, data) -> { + log(event); if (hasCompensationSentTx(data) || !data.isTerminated()) { return stay(); } else { @@ -246,6 +266,7 @@ public class SagaActor extends } ).event(SagaAbortedEvent.class, SagaData.class, (event, data) -> { + log(event); data.setTerminated(true); if (hasCommittedTx(data)) { SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED); @@ -264,11 +285,13 @@ public class SagaActor extends } ).event(TxStartedEvent.class, SagaData.class, (event, data) -> { + log(event); AddTxEventDomain domainEvent = new AddTxEventDomain(event); return stay().applying(domainEvent); } ).event(TxEndedEvent.class, SagaData.class, (event, data) -> { + log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return stay().applying(domainEvent).andThen(exec(_data -> { TxEntity txEntity = _data.getTxEntityMap().get(event.getLocalTxId()); @@ -287,27 +310,8 @@ public class SagaActor extends when(SagaActorState.COMMITTED, matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class, (event, data) -> { - // 已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理 - // 以下基于 journal-redis 说明: - // 假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key - // journal:persistenceIds - // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 - // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr - // - // 1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到 - // 2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件 - // 使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到 - // 3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号 - // - // 何如清理: - // 通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理 - // 遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr - // 如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除 - // 并删除 journal:persisted:item:highestSequenceNr - // - // 目前可以看到的解释是 https://github.com/akka/akka/issues/21181 - deleteMessages(lastSequenceNr()); - deleteSnapshot(snapshotSequenceNr()); + log(event); + beforeStop(stateName(), data); return stop(); } ) @@ -316,8 +320,8 @@ public class SagaActor extends when(SagaActorState.SUSPENDED, matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class, (event, data) -> { - deleteMessages(lastSequenceNr()); - deleteSnapshot(snapshotSequenceNr()); + log(event); + beforeStop(stateName(), data); return stop(); } ) @@ -326,8 +330,8 @@ public class SagaActor extends when(SagaActorState.COMPENSATED, matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class, (event, data) -> { - deleteMessages(lastSequenceNr()); - deleteSnapshot(snapshotSequenceNr()); + log(event); + beforeStop(stateName(), data); return stop(); } ) @@ -348,13 +352,14 @@ public class SagaActor extends .putSagaData(stateData().getGlobalTxId(), stateData()); } if (LOG.isDebugEnabled()) { - LOG.debug("transition {} {} -> {}", getSelf(), from, to); + LOG.debug("transition {} {} -> {}", stateData().getGlobalTxId(), from, to); } if (to == SagaActorState.COMMITTED || to == SagaActorState.SUSPENDED || to == SagaActorState.COMPENSATED) { self().tell(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.builder().build(), self()); } + LOG.info("transition {} {} -> {}", stateData().getGlobalTxId(), from, to); }) ); @@ -362,102 +367,151 @@ public class SagaActor extends matchStop( Normal(), (state, data) -> { if (LOG.isDebugEnabled()) { - LOG.debug("stop {} {}", data.getGlobalTxId(), state); + LOG.debug("saga actor stopped {} {}", getSelf(), state); } - sagaEndTime = System.currentTimeMillis(); - SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter(); - SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaAvgTime(sagaEndTime - sagaBeginTime); - data.setLastState(state); - data.setEndTime(new Date()); - data.setTerminated(true); - SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem()) - .stopSagaData(data.getGlobalTxId(), data); + LOG.info("stopped {} {}", data.getGlobalTxId(), state); } ) ); } - @Override - public SagaData applyEvent(DomainEvent event, SagaData data) { - if (this.recoveryRunning()) { - LOG.info("SagaActor recovery {}",event.getEvent()); - } + private void beforeStop(SagaActorState state, SagaData data){ if (LOG.isDebugEnabled()) { - LOG.debug("SagaActor apply event {}", event.getEvent()); + LOG.debug("stop {} {}", data.getGlobalTxId(), state); } - // log event to SagaData - if (event.getEvent() != null && !(event - .getEvent() instanceof ComponsitedCheckEvent)) { - data.logEvent(event.getEvent()); + try{ + sagaEndTime = System.currentTimeMillis(); + data.setLastState(state); + data.setEndTime(new Date()); + data.setTerminated(true); + SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem()) + .stopSagaData(data.getGlobalTxId(), data); + SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter(); + SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()) + .doSagaAvgTime(sagaEndTime - sagaBeginTime); + + // destroy self from cluster shard region + getContext().getParent() + .tell(new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf()); + if (LOG.isDebugEnabled()) { + LOG.debug("destroy saga actor {} from cluster shard region", getSelf()); + } + + // clear self mailbox from persistence + // 已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理 + // 以下基于 journal-redis 说明: + // 假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key + // journal:persistenceIds + // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 + // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr + // + // 1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到 + // 2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件 + // 使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到 + // 3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号 + // + // 何如清理: + // 通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理 + // 遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr + // 如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除 + // 并删除 journal:persisted:item:highestSequenceNr + // + // 目前可以看到的解释是 https://github.com/akka/akka/issues/21181 + deleteMessages(lastSequenceNr()); + deleteSnapshot(snapshotSequenceNr()); + }catch(Exception e){ + LOG.error("stop {} fail",data.getGlobalTxId()); + throw e; } - if (event instanceof SagaStartedDomain) { - SagaStartedDomain domainEvent = (SagaStartedDomain) event; - data.setServiceName(domainEvent.getEvent().getServiceName()); - data.setInstanceId(domainEvent.getEvent().getInstanceId()); - data.setGlobalTxId(domainEvent.getEvent().getGlobalTxId()); - data.setBeginTime(domainEvent.getEvent().getCreateTime()); - data.setExpirationTime(domainEvent.getExpirationTime()); - } else if (event instanceof AddTxEventDomain) { - AddTxEventDomain domainEvent = (AddTxEventDomain) event; - if (!data.getTxEntityMap().containsKey(domainEvent.getEvent().getLocalTxId())) { - TxEntity txEntity = TxEntity.builder() - .serviceName(domainEvent.getEvent().getServiceName()) - .instanceId(domainEvent.getEvent().getInstanceId()) - .globalTxId(domainEvent.getEvent().getGlobalTxId()) - .localTxId(domainEvent.getEvent().getLocalTxId()) - .parentTxId(domainEvent.getEvent().getParentTxId()) - .compensationMethod(domainEvent.getCompensationMethod()) - .payloads(domainEvent.getPayloads()) - .state(domainEvent.getState()) - .beginTime(domainEvent.getEvent().getCreateTime()) - .build(); - data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity); - } else { - LOG.warn("TxEntity {} already exists", domainEvent.getEvent().getLocalTxId()); + } + + @Override + public SagaData applyEvent(DomainEvent event, SagaData data) { + try{ + if (this.recoveryRunning()) { + LOG.info("SagaActor recovery {}",event.getEvent()); + }else if (LOG.isDebugEnabled()) { + LOG.debug("SagaActor apply event {}", event.getEvent()); } - } else if (event instanceof UpdateTxEventDomain) { - UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event; - TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId()); - txEntity.setEndTime(domainEvent.getEvent().getCreateTime()); - if (domainEvent.getState() == TxState.COMMITTED) { - txEntity.setState(domainEvent.getState()); - } else if (domainEvent.getState() == TxState.FAILED) { - txEntity.setState(domainEvent.getState()); - txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads()); - data.getTxEntityMap().forEach((k, v) -> { - if (v.getState() == TxState.COMMITTED) { - // call compensate - compensation(v, data); - } - }); - } else if (domainEvent.getState() == TxState.COMPENSATED) { - // decrement the compensation running counter by one - data.getCompensationRunningCounter().decrementAndGet(); - txEntity.setState(domainEvent.getState()); - LOG.info("compensation is completed {}", txEntity.getLocalTxId()); + // log event to SagaData + if (event.getEvent() != null && !(event + .getEvent() instanceof ComponsitedCheckEvent)) { + data.logEvent(event.getEvent()); } - } else if (event instanceof SagaEndedDomain) { - SagaEndedDomain domainEvent = (SagaEndedDomain) event; - if (domainEvent.getState() == SagaActorState.FAILED) { - data.setTerminated(true); - data.getTxEntityMap().forEach((k, v) -> { - if (v.getState() == TxState.COMMITTED) { - // call compensate - compensation(v, data); - } - }); - } else if (domainEvent.getState() == SagaActorState.SUSPENDED) { - data.setEndTime(event.getEvent().getCreateTime()); - data.setTerminated(true); - data.setSuspendedType(domainEvent.getSuspendedType()); - } else if (domainEvent.getState() == SagaActorState.COMPENSATED) { - data.setEndTime(event.getEvent().getCreateTime()); - data.setTerminated(true); - } else if (domainEvent.getState() == SagaActorState.COMMITTED) { - data.setEndTime(event.getEvent().getCreateTime()); - data.setTerminated(true); + if (event instanceof SagaStartedDomain) { + SagaStartedDomain domainEvent = (SagaStartedDomain) event; + data.setServiceName(domainEvent.getEvent().getServiceName()); + data.setInstanceId(domainEvent.getEvent().getInstanceId()); + data.setGlobalTxId(domainEvent.getEvent().getGlobalTxId()); + data.setBeginTime(domainEvent.getEvent().getCreateTime()); + data.setExpirationTime(domainEvent.getExpirationTime()); + } else if (event instanceof AddTxEventDomain) { + AddTxEventDomain domainEvent = (AddTxEventDomain) event; + if (!data.getTxEntityMap().containsKey(domainEvent.getEvent().getLocalTxId())) { + TxEntity txEntity = TxEntity.builder() + .serviceName(domainEvent.getEvent().getServiceName()) + .instanceId(domainEvent.getEvent().getInstanceId()) + .globalTxId(domainEvent.getEvent().getGlobalTxId()) + .localTxId(domainEvent.getEvent().getLocalTxId()) + .parentTxId(domainEvent.getEvent().getParentTxId()) + .compensationMethod(domainEvent.getCompensationMethod()) + .payloads(domainEvent.getPayloads()) + .state(domainEvent.getState()) + .beginTime(domainEvent.getEvent().getCreateTime()) + .build(); + data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity); + } else { + LOG.warn("TxEntity {} already exists", domainEvent.getEvent().getLocalTxId()); + } + } else if (event instanceof UpdateTxEventDomain) { + UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event; + TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId()); + txEntity.setEndTime(domainEvent.getEvent().getCreateTime()); + if (domainEvent.getState() == TxState.COMMITTED) { + txEntity.setState(domainEvent.getState()); + } else if (domainEvent.getState() == TxState.FAILED) { + txEntity.setState(domainEvent.getState()); + txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads()); + data.getTxEntityMap().forEach((k, v) -> { + if (v.getState() == TxState.COMMITTED) { + // call compensate + compensation(v, data); + } + }); + } else if (domainEvent.getState() == TxState.COMPENSATED) { + // decrement the compensation running counter by one + data.getCompensationRunningCounter().decrementAndGet(); + txEntity.setState(domainEvent.getState()); + LOG.info("compensation is completed {}", txEntity.getLocalTxId()); + } + } else if (event instanceof SagaEndedDomain) { + SagaEndedDomain domainEvent = (SagaEndedDomain) event; + if (domainEvent.getState() == SagaActorState.FAILED) { + data.setTerminated(true); + data.getTxEntityMap().forEach((k, v) -> { + if (v.getState() == TxState.COMMITTED) { + // call compensate + compensation(v, data); + } + }); + } else if (domainEvent.getState() == SagaActorState.SUSPENDED) { + data.setEndTime(event.getEvent().getCreateTime()); + data.setTerminated(true); + data.setSuspendedType(domainEvent.getSuspendedType()); + } else if (domainEvent.getState() == SagaActorState.COMPENSATED) { + data.setEndTime(event.getEvent().getCreateTime()); + data.setTerminated(true); + } else if (domainEvent.getState() == SagaActorState.COMMITTED) { + data.setEndTime(event.getEvent().getCreateTime()); + data.setTerminated(true); + } } + }catch (Exception ex){ + LOG.error("SagaActor apply event {}", event.getEvent()); + beforeStop(SagaActorState.SUSPENDED, data); + stop(); + //TODO 增加 SagaActor 处理失败指标 } return data; } @@ -531,4 +585,10 @@ public class SagaActor extends } } } + + private void log(BaseEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug(event.toString()); + } + } } diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml index 98a58ce..664f692 100644 --- a/alpha/alpha-server/src/main/resources/application.yaml +++ b/alpha/alpha-server/src/main/resources/application.yaml @@ -90,7 +90,7 @@ akkaConfig: acceptable-heartbeat-pause: 6s seed-nodes: ["akka://alpha-cluster@127.0.0.1:8070"] sharding: - state-store-mode: "persistence" + state-store-mode: "ddata" #ddata,persistence remember-entities: true shard-failure-backoff: 5s @@ -173,21 +173,24 @@ akkaConfig: commit-timeout: 15s commit-time-warning: 1s commit-refresh-interval: infinite - use-dispatcher: "akka.kafka.default-dispatcher" + use-dispatcher: "akka.kafka.saga-kafka" kafka-clients.enable.auto.commit: false wait-close-partition: 500ms - position-timeout: 5s - offset-for-times-timeout: 5s - metadata-request-timeout: 5s + position-timeout: 10s + offset-for-times-timeout: 10s + metadata-request-timeout: 10s eos-draining-check-interval: 30ms partition-handler-warning: 5s connection-checker.enable: false connection-checker.max-retries: 3 connection-checker.check-interval: 15s connection-checker.backoff-factor: 2.0 - max-batch: 1000 - max-interval: 10s - parallelism: 1 + saga-kafka: + type: "Dispatcher" + executor: "thread-pool-executor" + thread-pool-executor: + fixed-pool-size: 20 + akka-persistence-redis: redis: