This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch minor-kafka-updaes in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 08d2985bfc5d4d75676f36599eb9986d8475af2f Author: Christopher Batey <[email protected]> AuthorDate: Tue Mar 17 10:38:36 2020 +0000 Minor updates to the kafka sample while recording demo --- ...utable.Vector.nullSlotAndCopy(Object[][],_int).bgv | Bin 18702 -> 0 bytes ...utable.Vector.nullSlotAndCopy(Object[][],_int).cfg | 5 ----- ....VectorPointer.gotoPosWritable1(int,_int,_int).bgv | Bin 9978 -> 0 bytes ....VectorPointer.gotoPosWritable1(int,_int,_int).cfg | 5 ----- ...a.collection.immutable.Vector.appended(Object).bgv | Bin 251627 -> 0 bytes ...a.collection.immutable.Vector.appended(Object).cfg | 5 ----- ...utable.Vector.nullSlotAndCopy(Object[][],_int).bgv | Bin 18700 -> 0 bytes ...utable.Vector.nullSlotAndCopy(Object[][],_int).cfg | 5 ----- .../client/src/main/scala/client/ClientApp.scala | 2 +- .../processor/src/main/resources/application.conf | 2 +- .../src/main/scala/sample/sharding/kafka/Main.scala | 18 +++++++++++++++--- .../sharding/kafka/UserEventsKafkaProcessor.scala | 2 +- 12 files changed, 18 insertions(+), 26 deletions(-) diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv deleted file mode 100644 index 317ce7a..0000000 Binary files a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv and /dev/null differ diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg deleted file mode 100644 index 9a4840a..0000000 --- a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg +++ /dev/null @@ -1,5 +0,0 @@ -begin_compilation - name " HotSpotCompilation-36914[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]" - method "HotSpotCompilation-36914[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]" - date 1583857594397 -end_compilation diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv deleted file mode 100644 index 190c24c..0000000 Binary files a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv and /dev/null differ diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg deleted file mode 100644 index b8126a4..0000000 --- a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg +++ /dev/null @@ -1,5 +0,0 @@ -begin_compilation - name " HotSpotCompilation-37214[scala.collection.immutable.VectorPointer.gotoPosWritable1(int, int, int)]" - method "HotSpotCompilation-37214[scala.collection.immutable.VectorPointer.gotoPosWritable1(int, int, int)]" - date 1583859506333 -end_compilation diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv deleted file mode 100644 index 0bc4add..0000000 Binary files a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv and /dev/null differ diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg deleted file mode 100644 index 67a37d2..0000000 --- a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg +++ /dev/null @@ -1,5 +0,0 @@ -begin_compilation - name " HotSpotCompilation-35789[scala.collection.immutable.Vector.appended(Object)]" - method "HotSpotCompilation-35789[scala.collection.immutable.Vector.appended(Object)]" - date 1583857658632 -end_compilation diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv deleted file mode 100644 index 259b2be..0000000 Binary files a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv and /dev/null differ diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg deleted file mode 100644 index 9c7e37a..0000000 --- a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg +++ /dev/null @@ -1,5 +0,0 @@ -begin_compilation - name " HotSpotCompilation-35679[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]" - method "HotSpotCompilation-35679[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]" - date 1583857594379 -end_compilation diff --git a/akka-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala b/akka-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala index 49acef5..8254f0b 100644 --- a/akka-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala +++ b/akka-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala @@ -15,7 +15,7 @@ object ClientApp extends App { implicit val system: ActorSystem = ActorSystem("UserClient") implicit val mat: Materializer = Materializer.createMaterializer(system) implicit val ec: ExecutionContextExecutor = system.dispatcher - val clientSettings = GrpcClientSettings.connectToServiceAt("localhost", 8081).withTls(false) + val clientSettings = GrpcClientSettings.connectToServiceAt("127.0.0.1", 8081).withTls(false) val client = UserServiceClient(clientSettings) var userId = "" diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf index 6f5594d..921e182 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf @@ -2,7 +2,7 @@ kafka-to-sharding-processor { bootstrap-servers = "localhost:9092" topics = ["user-events"] group = "user-processing" - ask-timeout = 5s + ask-timeout = 10s } akka.http { diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala index 283c9d1..893dd72 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala @@ -16,6 +16,7 @@ import scala.util.{Failure, Success} sealed trait Command case object NodeMemberUp extends Command final case class ShardingStarted(region: ActorRef[UserEvents.Command]) extends Command +final case class BindingFailed(reason: Throwable) extends Command object Main { def main(args: Array[String]): Unit = { @@ -50,10 +51,17 @@ object Main { }, "KafkaToSharding", config(remotingPort, akkaManagementPort)) def start(ctx: ActorContext[Command], region: ActorRef[UserEvents.Command], settings: ProcessorSettings): Behavior[Command] = { + import ctx.executionContext ctx.log.info("Sharding started and joined cluster. Starting event processor") val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(region, settings), "kafka-event-processor") val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, region) - running(binding, eventProcessor) + binding.onComplete { + case Success(bound) => + ctx.log.info("Bound: {}", bound) + case Failure(t) => + ctx.self ! BindingFailed(t) + } + running(ctx, binding, eventProcessor) } def starting(ctx: ActorContext[Command], sharding: Option[ActorRef[UserEvents.Command]], joinedCluster: Boolean, settings: ProcessorSettings): Behavior[Command] = Behaviors @@ -72,8 +80,12 @@ object Main { starting(ctx, sharding, joinedCluster = true, settings) } - def running(binding: Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] = Behaviors - .receiveSignal { + def running(ctx: ActorContext[Command], binding: Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] = + Behaviors.receiveMessagePartial[Command] { + case BindingFailed(t) => + ctx.log.error("Failed to bind front end", t) + Behaviors.stopped + }.receiveSignal { case (ctx, Terminated(`processor`)) => ctx.log.warn("Kafka event processor stopped. Shutting down") binding.map(_.unbind())(ctx.executionContext) diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala index 9ad8305..ae68801 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala @@ -49,7 +49,7 @@ object UserEventsKafkaProcessor { purchaseProto.price, replyTo) })(processorSettings.askTimeout, ctx.system.scheduler), - attempts = 3, + attempts = 5, delay = 1.second ) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
