Apologies for the alarmist subject, but we're having a very difficult time getting supervision solid for a committablePartitionedSource.
Latest issue is that someone used the Kafka REST Proxy to post an invalid message (we *are* using the schema registry). When running our consumer, that resulted in Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 4 Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403 This then manifested in the stream's onFailure handler as Caused by: java.lang.Exception: Consumer actor terminated at akka.kafka.internal.SubSourceLogic$SubSourceStage$$anon$1$$anonfun$preStart$2.apply(SubSourceLogic.scala:150) At that point there is no way to skip the element by resuming, and the stream is stuck because it is stopped (and restarted by our host actor) with no offset committed. The "Consumer actor terminated" exception has no cause value. I've tried attaching a Supervision.Decider to the partition source, the intervals source and the merge thereof. val mergedSource: Source[CommittableNormalizedInterval, Consumer.Control] = sources.withAttributes(attributesForPartitionSource) .flatMapMerge(breadth = consumerParallelism, a => a._2. withAttributes(attributesForIntervalSource)) val result = mergedSource.withAttributes(attributesForIntervalSource) .via(partitionFlow) .toMat(Sink.ignore)(Keep.both) But those deciders are not reached, so I appear to have no way to detect the serialization problem and skip the element. On Fri, Oct 6, 2017 at 10:30 AM, Richard Rodseth <rrods...@gmail.com> wrote: > I'm using akka-kafka with committablePartitionedSource, meaning I have a > source of (partition, innersource) tuples. > > I'm currently using the flatMapMerge recipe from > > https://doc.akka.io/docs/akka-stream-kafka/current/consumer. > html#source-per-partition > > val done = Consumer.committablePartitionedSource(consumerSettings, > Subscriptions.topics("topic1")) > .flatMapMerge(maxPartitions, _._2) > .via(business) > .batch(max = 20, first => > CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, > elem) => > batch.updated(elem.committableOffset) > } > .mapAsync(3)(_.commitScaladsl()) > .runWith(Sink.ignore) > > If I have a default supervision strategy which resumes, set at the > materializer level, but want to stop the stream (which in my case restarts > a host actor with backoff) at the source level, do I need to do a > withAttributes(attributesForStoppingDecider) at both the outer source and > inner source leves? > i.e. > > val done = Consumer.committablePartitionedSource(consumerSettings, > Subscriptions.topics("topic1")) > .withAttributes(attributesForStoppingDecider).flatMapMerge(maxPartitions, > _._2.withAttributes(attributesForStoppingDecider)) > .via(business) > > or can I do it after the flatMapMerge? Question motivated by this open > ticket which I'm not sure I fully understand yet. > > https://github.com/akka/akka/issues/23066 > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.