Apologies for the alarmist subject, but we're having a very difficult time
getting supervision solid for a committablePartitionedSource.

Latest issue is that someone used the Kafka REST Proxy to post an invalid
message (we *are* using the schema registry). When running our consumer,
that resulted in

Caused by: org.apache.kafka.common.errors.SerializationException: Error
retrieving Avro schema for id 4

Caused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Schema not found; error code: 40403
This then manifested in the stream's onFailure handler as

Caused by: java.lang.Exception: Consumer actor terminated

at
akka.kafka.internal.SubSourceLogic$SubSourceStage$$anon$1$$anonfun$preStart$2.apply(SubSourceLogic.scala:150)

At that point there is no way to skip the element by resuming, and the
stream is stuck because it is stopped (and restarted by our host actor)
with no offset committed. The "Consumer actor terminated" exception has no
cause value.

I've tried attaching a Supervision.Decider to the partition source, the
intervals source and the merge thereof.

    val mergedSource: Source[CommittableNormalizedInterval, Consumer.Control]
=

      sources.withAttributes(attributesForPartitionSource)

        .flatMapMerge(breadth = consumerParallelism, a => a._2.
withAttributes(attributesForIntervalSource))

    val result = mergedSource.withAttributes(attributesForIntervalSource)

      .via(partitionFlow)

      .toMat(Sink.ignore)(Keep.both)
But those deciders are not reached, so I appear to have no way to detect
the serialization problem and skip the element.

On Fri, Oct 6, 2017 at 10:30 AM, Richard Rodseth <rrods...@gmail.com> wrote:

> I'm using akka-kafka with committablePartitionedSource, meaning I have a
> source of (partition, innersource) tuples.
>
> I'm currently using the flatMapMerge recipe from
>
> https://doc.akka.io/docs/akka-stream-kafka/current/consumer.
> html#source-per-partition
>
> val done = Consumer.committablePartitionedSource(consumerSettings, 
> Subscriptions.topics("topic1"))
>   .flatMapMerge(maxPartitions, _._2)
>   .via(business)
>   .batch(max = 20, first => 
> CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, 
> elem) =>
>     batch.updated(elem.committableOffset)
>   }
>   .mapAsync(3)(_.commitScaladsl())
>   .runWith(Sink.ignore)
>
> If I have a default supervision strategy which resumes, set at the
> materializer level, but want to stop the stream (which in my case restarts
> a host actor with backoff) at the source level, do I need to do a
> withAttributes(attributesForStoppingDecider) at both the outer source and
> inner source leves?
> i.e.
>
> val done = Consumer.committablePartitionedSource(consumerSettings, 
> Subscriptions.topics("topic1"))
>   .withAttributes(attributesForStoppingDecider).flatMapMerge(maxPartitions, 
> _._2.withAttributes(attributesForStoppingDecider))
> .via(business)
>
> or can I do it after the flatMapMerge? Question motivated by this open
> ticket which I'm not sure I fully understand yet.
>
> https://github.com/akka/akka/issues/23066
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to