I was able to reproduce what you saw with modification to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala I have logged KAFKA-7316 and am looking for a fix.
FYI On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dru...@arrcus.com> wrote: > Isn’t that a bug then? Or can I fix my code somehow? > > > > On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com<mailto: > yuzhih...@gmail.com>) wrote: > > I think what happened in your use case was that the following implicit > from ImplicitConversions.scala kept wrapping the resultant KTable from > filter(): > > implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] = > > leading to stack overflow. > > Cheers > > On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dru...@arrcus.com> > wrote: > > > Hi, > > > > I’m using the org.kafka.streams.scala that was released with version > > 2.0.0. I’m getting a StackOverflowError as follows: > > > > java.lang.StackOverflowError > > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) > > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) > > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) > > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) > > . > > . > > . > > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) > > > > The Scala version I’m using is 2.11.11 and the code leading to the error > > is as follows (particularly the .filter). > > > > val builder = new StreamsBuilder > > > > val stream = builder.stream[Array[Byte], CaseClassA](args.topic) > > > > val customers = args.config.keys > > > > val predicates = customers.map { customerId => > > (_: Array[Byte], message: CaseClassA) => message.customerId == customerId > > }.toSeq > > > > val customerIdToStream = customers.zip(stream(predicates: _*)).toMap > > > > val y = Printed.toSysOut[Windowed[Key], Long] > > > > customerIdToStream.foreach { case (customerId, customerStream) => > > val customerConfig = args.config(customerId) > > customerStream > > .flatMap { case (_, message) => > > message.objects.map { > > case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1 > > } > > } > > .groupByKey > > > > > .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize)) > > .count() > > .filter { case (_, count) => count >= > > customerConfig.frequencyThreshold } > > .toStream > > .print(y) > > } > > > > Is this a bug with the new scala module related to: > > https://github.com/lightbend/kafka-streams-scala/issues/63 ? > > Or am I doing something wrong? > > > > Thanks, > > Druhin > > >