I was able to reproduce what you saw with modification
to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
I have logged KAFKA-7316 and am looking for a fix.

FYI

On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dru...@arrcus.com> wrote:

> Isn’t that a bug then? Or can I fix my code somehow?
>
>
>
> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com<mailto:
> yuzhih...@gmail.com>) wrote:
>
> I think what happened in your use case was that the following implicit
> from ImplicitConversions.scala kept wrapping the resultant KTable from
> filter():
>
> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
>
> leading to stack overflow.
>
> Cheers
>
> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dru...@arrcus.com>
> wrote:
>
> > Hi,
> >
> > I’m using the org.kafka.streams.scala that was released with version
> > 2.0.0. I’m getting a StackOverflowError as follows:
> >
> > java.lang.StackOverflowError
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > .
> > .
> > .
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >
> > The Scala version I’m using is 2.11.11 and the code leading to the error
> > is as follows (particularly the .filter).
> >
> > val builder = new StreamsBuilder
> >
> > val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
> >
> > val customers = args.config.keys
> >
> > val predicates = customers.map { customerId =>
> > (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
> > }.toSeq
> >
> > val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
> >
> > val y = Printed.toSysOut[Windowed[Key], Long]
> >
> > customerIdToStream.foreach { case (customerId, customerStream) =>
> > val customerConfig = args.config(customerId)
> > customerStream
> > .flatMap { case (_, message) =>
> > message.objects.map {
> > case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> > }
> > }
> > .groupByKey
> >
> >
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
> > .count()
> > .filter { case (_, count) => count >=
> > customerConfig.frequencyThreshold }
> > .toStream
> > .print(y)
> > }
> >
> > Is this a bug with the new scala module related to:
> > https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> > Or am I doing something wrong?
> >
> > Thanks,
> > Druhin
> >
>

Reply via email to