Thanks for reporting and for creating the ticket!

-Matthias

On 8/20/18 5:17 PM, Ted Yu wrote:
> I was able to reproduce what you saw with modification
> to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> I have logged KAFKA-7316 and am looking for a fix.
> 
> FYI
> 
> On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dru...@arrcus.com> wrote:
> 
>> Isn’t that a bug then? Or can I fix my code somehow?
>>
>>
>>
>> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com<mailto:
>> yuzhih...@gmail.com>) wrote:
>>
>> I think what happened in your use case was that the following implicit
>> from ImplicitConversions.scala kept wrapping the resultant KTable from
>> filter():
>>
>> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
>>
>> leading to stack overflow.
>>
>> Cheers
>>
>> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dru...@arrcus.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I’m using the org.kafka.streams.scala that was released with version
>>> 2.0.0. I’m getting a StackOverflowError as follows:
>>>
>>> java.lang.StackOverflowError
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> .
>>> .
>>> .
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>>
>>> The Scala version I’m using is 2.11.11 and the code leading to the error
>>> is as follows (particularly the .filter).
>>>
>>> val builder = new StreamsBuilder
>>>
>>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>>>
>>> val customers = args.config.keys
>>>
>>> val predicates = customers.map { customerId =>
>>> (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
>>> }.toSeq
>>>
>>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>>>
>>> val y = Printed.toSysOut[Windowed[Key], Long]
>>>
>>> customerIdToStream.foreach { case (customerId, customerStream) =>
>>> val customerConfig = args.config(customerId)
>>> customerStream
>>> .flatMap { case (_, message) =>
>>> message.objects.map {
>>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
>>> }
>>> }
>>> .groupByKey
>>>
>>>
>> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
>>> .count()
>>> .filter { case (_, count) => count >=
>>> customerConfig.frequencyThreshold }
>>> .toStream
>>> .print(y)
>>> }
>>>
>>> Is this a bug with the new scala module related to:
>>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
>>> Or am I doing something wrong?
>>>
>>> Thanks,
>>> Druhin
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to