I think what happened in your use case was that the following implicit
from ImplicitConversions.scala kept wrapping the resultant KTable from
filter():

  implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =

leading to stack overflow.

Cheers

On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dru...@arrcus.com>
wrote:

> Hi,
>
> I’m using the org.kafka.streams.scala that was released with version
> 2.0.0. I’m getting a StackOverflowError as follows:
>
> java.lang.StackOverflowError
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>                                                    .
>    .
>    .
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>
> The Scala version I’m using is 2.11.11 and the code leading to the error
> is as follows (particularly the .filter).
>
> val builder = new StreamsBuilder
>
> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>
> val customers = args.config.keys
>
> val predicates = customers.map { customerId =>
>   (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
> }.toSeq
>
> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>
> val y = Printed.toSysOut[Windowed[Key], Long]
>
> customerIdToStream.foreach { case (customerId, customerStream) =>
>   val customerConfig = args.config(customerId)
>   customerStream
>     .flatMap { case (_, message) =>
>       message.objects.map {
>         case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
>       }
>     }
>     .groupByKey
>
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
>     .count()
>     .filter { case (_, count) => count >=
> customerConfig.frequencyThreshold }
>     .toStream
>     .print(y)
> }
>
> Is this a bug with the new scala module related to:
> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> Or am I doing something wrong?
>
> Thanks,
> Druhin
>

Reply via email to