This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch minor-kafka-updaes
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 08d2985bfc5d4d75676f36599eb9986d8475af2f
Author: Christopher Batey <[email protected]>
AuthorDate: Tue Mar 17 10:38:36 2020 +0000

    Minor updates to the kafka sample while recording demo
---
 ...utable.Vector.nullSlotAndCopy(Object[][],_int).bgv | Bin 18702 -> 0 bytes
 ...utable.Vector.nullSlotAndCopy(Object[][],_int).cfg |   5 -----
 ....VectorPointer.gotoPosWritable1(int,_int,_int).bgv | Bin 9978 -> 0 bytes
 ....VectorPointer.gotoPosWritable1(int,_int,_int).cfg |   5 -----
 ...a.collection.immutable.Vector.appended(Object).bgv | Bin 251627 -> 0 bytes
 ...a.collection.immutable.Vector.appended(Object).cfg |   5 -----
 ...utable.Vector.nullSlotAndCopy(Object[][],_int).bgv | Bin 18700 -> 0 bytes
 ...utable.Vector.nullSlotAndCopy(Object[][],_int).cfg |   5 -----
 .../client/src/main/scala/client/ClientApp.scala      |   2 +-
 .../processor/src/main/resources/application.conf     |   2 +-
 .../src/main/scala/sample/sharding/kafka/Main.scala   |  18 +++++++++++++++---
 .../sharding/kafka/UserEventsKafkaProcessor.scala     |   2 +-
 12 files changed, 18 insertions(+), 26 deletions(-)

diff --git 
a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv
 
b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv
deleted file mode 100644
index 317ce7a..0000000
Binary files 
a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv
 and /dev/null differ
diff --git 
a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
 
b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
deleted file mode 100644
index 9a4840a..0000000
--- 
a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
+++ /dev/null
@@ -1,5 +0,0 @@
-begin_compilation
-  name " 
HotSpotCompilation-36914[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],
 int)]"
-  method 
"HotSpotCompilation-36914[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],
 int)]"
-  date 1583857594397
-end_compilation
diff --git 
a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv
 
b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv
deleted file mode 100644
index 190c24c..0000000
Binary files 
a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv
 and /dev/null differ
diff --git 
a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg
 
b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg
deleted file mode 100644
index b8126a4..0000000
--- 
a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg
+++ /dev/null
@@ -1,5 +0,0 @@
-begin_compilation
-  name " 
HotSpotCompilation-37214[scala.collection.immutable.VectorPointer.gotoPosWritable1(int,
 int, int)]"
-  method 
"HotSpotCompilation-37214[scala.collection.immutable.VectorPointer.gotoPosWritable1(int,
 int, int)]"
-  date 1583859506333
-end_compilation
diff --git 
a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv
 
b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv
deleted file mode 100644
index 0bc4add..0000000
Binary files 
a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv
 and /dev/null differ
diff --git 
a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg
 
b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg
deleted file mode 100644
index 67a37d2..0000000
--- 
a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg
+++ /dev/null
@@ -1,5 +0,0 @@
-begin_compilation
-  name " 
HotSpotCompilation-35789[scala.collection.immutable.Vector.appended(Object)]"
-  method 
"HotSpotCompilation-35789[scala.collection.immutable.Vector.appended(Object)]"
-  date 1583857658632
-end_compilation
diff --git 
a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv
 
b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv
deleted file mode 100644
index 259b2be..0000000
Binary files 
a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv
 and /dev/null differ
diff --git 
a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
 
b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
deleted file mode 100644
index 9c7e37a..0000000
--- 
a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
+++ /dev/null
@@ -1,5 +0,0 @@
-begin_compilation
-  name " 
HotSpotCompilation-35679[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],
 int)]"
-  method 
"HotSpotCompilation-35679[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],
 int)]"
-  date 1583857594379
-end_compilation
diff --git 
a/akka-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala
 
b/akka-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala
index 49acef5..8254f0b 100644
--- 
a/akka-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala
+++ 
b/akka-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala
@@ -15,7 +15,7 @@ object ClientApp extends App {
   implicit val system: ActorSystem = ActorSystem("UserClient")
   implicit val mat: Materializer = Materializer.createMaterializer(system)
   implicit val ec: ExecutionContextExecutor = system.dispatcher
-  val clientSettings = GrpcClientSettings.connectToServiceAt("localhost", 
8081).withTls(false)
+  val clientSettings = GrpcClientSettings.connectToServiceAt("127.0.0.1", 
8081).withTls(false)
   val client = UserServiceClient(clientSettings)
 
   var userId = ""
diff --git 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
index 6f5594d..921e182 100644
--- 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
+++ 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
@@ -2,7 +2,7 @@ kafka-to-sharding-processor {
   bootstrap-servers = "localhost:9092"
   topics = ["user-events"]
   group = "user-processing"
-  ask-timeout = 5s
+  ask-timeout = 10s
 }
 
 akka.http {
diff --git 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index 283c9d1..893dd72 100644
--- 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -16,6 +16,7 @@ import scala.util.{Failure, Success}
 sealed trait Command
 case object NodeMemberUp extends Command
 final case class ShardingStarted(region: ActorRef[UserEvents.Command]) extends 
Command
+final case class BindingFailed(reason: Throwable) extends Command
 
 object Main {
   def main(args: Array[String]): Unit = {
@@ -50,10 +51,17 @@ object Main {
     }, "KafkaToSharding", config(remotingPort, akkaManagementPort))
 
     def start(ctx: ActorContext[Command], region: 
ActorRef[UserEvents.Command], settings: ProcessorSettings): Behavior[Command] = 
{
+      import ctx.executionContext
       ctx.log.info("Sharding started and joined cluster. Starting event 
processor")
       val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(region, 
settings), "kafka-event-processor")
       val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, 
frontEndPort, region)
-      running(binding, eventProcessor)
+      binding.onComplete {
+        case Success(bound) =>
+          ctx.log.info("Bound: {}", bound)
+        case Failure(t) =>
+          ctx.self ! BindingFailed(t)
+      }
+      running(ctx, binding, eventProcessor)
     }
 
     def starting(ctx: ActorContext[Command], sharding: 
Option[ActorRef[UserEvents.Command]], joinedCluster: Boolean, settings: 
ProcessorSettings): Behavior[Command] = Behaviors
@@ -72,8 +80,12 @@ object Main {
           starting(ctx, sharding, joinedCluster = true, settings)
       }
 
-    def running(binding: Future[Http.ServerBinding], processor: 
ActorRef[Nothing]): Behavior[Command] = Behaviors
-      .receiveSignal {
+    def running(ctx: ActorContext[Command], binding: 
Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] =
+      Behaviors.receiveMessagePartial[Command] {
+        case BindingFailed(t) =>
+          ctx.log.error("Failed to bind front end", t)
+          Behaviors.stopped
+      }.receiveSignal {
         case (ctx, Terminated(`processor`)) =>
           ctx.log.warn("Kafka event processor stopped. Shutting down")
           binding.map(_.unbind())(ctx.executionContext)
diff --git 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 9ad8305..ae68801 100644
--- 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -49,7 +49,7 @@ object UserEventsKafkaProcessor {
                   purchaseProto.price,
                   replyTo)
               })(processorSettings.askTimeout, ctx.system.scheduler),
-              attempts = 3,
+              attempts = 5,
               delay = 1.second
             )
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to