[ https://issues.apache.org/jira/browse/KAFKA-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949002#comment-16949002 ]
Alex Kokachev commented on KAFKA-9011: -------------------------------------- I came up with the code below. If it's acceptable, happy to raise a PR. I'm fairly new to Scala, so open to any feedback. KStream.scala: {noformat} def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]], stateStoreNames: String*): KStream[K1, V1] = inner.flatTransform(transformerSupplier.asIterable, stateStoreNames: _*){noformat} FucntionsCompatConversions.scala: {code:java} implicit class IterableTransformerSupplier[K, V, VO](val supplier : TransformerSupplier[K, V, Iterable[VO]] ) extends AnyVal { def asIterable: TransformerSupplier[K, V, JIterable[VO]] = new TransformerSupplier[K, V, JIterable[VO]] { override def get(): Transformer[K, V, JIterable[VO]] = { new Transformer[K, V, JIterable[VO]] { override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava override def init(context: ProcessorContext): Unit = supplier.get().init(context) override def close(): Unit = supplier.get().close() } } } } {code} > Add KStream#flatTransform and KStream#flatTransformValues to Scala API > ---------------------------------------------------------------------- > > Key: KAFKA-9011 > URL: https://issues.apache.org/jira/browse/KAFKA-9011 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.3.0 > Reporter: Alex Kokachev > Priority: Major > Labels: scala, streams > > Part of KIP-313: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues] > -- This message was sent by Atlassian Jira (v8.3.4#803005)