[ https://issues.apache.org/jira/browse/KAFKA-7250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16571800#comment-16571800 ]
ASF GitHub Bot commented on KAFKA-7250: --------------------------------------- guozhangwang closed pull request #5468: KAFKA-7250: fix transform function in scala DSL to accept TranformerS… URL: https://github.com/apache/kafka/pull/5468 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala index 4a4c3b0ee44..65ea4903326 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala @@ -105,4 +105,10 @@ object FunctionConversions { override def apply(): VA = f() } } + + implicit class TransformerSupplierFromFunction[K, V, VO](val f: () => Transformer[K, V, VO]) extends AnyVal { + def asTransformerSupplier: TransformerSupplier[K, V, VO] = new TransformerSupplier[K, V, VO] { + override def get(): Transformer[K, V, VO] = f() + } + } } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 8f6aab86e2e..c02939aeab7 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -22,7 +22,7 @@ package kstream import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _} -import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier, TopicNameExtractor} +import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionConversions._ @@ -284,33 +284,21 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { /** * Transform each record of the input stream into zero or more records in the output stream (both key and value type * can be altered arbitrarily). - * A `Transformer` is applied to each input record and computes zero or more output records. In order to assign a - * state, the state must be created and registered beforehand via stores added via `addStateStore` or `addGlobalStore` + * A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record + * and computes zero or more output records. + * In order to assign a state, the state must be created and registered + * beforehand via stores added via `addStateStore` or `addGlobalStore` * before they can be connected to the `Transformer` * - * @param transformer the `Transformer` instance + * @param transformerSupplier the `TransformerSuplier` that generates `Transformer` * @param stateStoreNames the names of the state stores used by the processor * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transform` */ - def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], stateStoreNames: String*): KStream[K1, V1] = { - val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = - new TransformerSupplier[K, V, KeyValue[K1, V1]] { - override def get(): Transformer[K, V, KeyValue[K1, V1]] = - new Transformer[K, V, KeyValue[K1, V1]] { - override def transform(key: K, value: V): KeyValue[K1, V1] = - transformer.transform(key, value) match { - case (k1, v1) => KeyValue.pair(k1, v1) - case _ => null - } - - override def init(context: ProcessorContext): Unit = transformer.init(context) - - override def close(): Unit = transformer.close() - } - } - inner.transform(transformerSupplierJ, stateStoreNames: _*) - } + def transform[K1, V1](transformerSupplier: () => Transformer[K, V, KeyValue[K1, V1]], + stateStoreNames: String*): KStream[K1, V1] = + inner.transform(transformerSupplier.asTransformerSupplier, stateStoreNames: _*) + /** * Transform the value of each input record into a new value (with possible new type) of the output record. diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index f04ec5dcb04..194abf5e3ba 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -31,6 +31,8 @@ import ImplicitConversions._ import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _} import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _} +import org.apache.kafka.streams.processor.ProcessorContext + import collection.JavaConverters._ /** @@ -194,4 +196,61 @@ class TopologyTest extends JUnitSuite { // should match assertEquals(getTopologyScala(), getTopologyJava()) } + + @Test def shouldBuildIdenticalTopologyInJavaNScalaTransform() = { + + // build the Scala topology + def getTopologyScala(): TopologyDescription = { + + import Serdes._ + + val streamBuilder = new StreamsBuilder + val textLines = streamBuilder.stream[String, String](inputTopic) + + val _: KTable[String, Long] = + textLines + .transform(() => new Transformer[String, String, KeyValue[String, String]] { + override def init(context: ProcessorContext): Unit = Unit + override def transform(key: String, value: String): KeyValue[String, String] = + new KeyValue(key, value.toLowerCase) + override def close(): Unit = Unit + }) + .groupBy((k, v) => v) + .count() + + streamBuilder.build().describe() + } + + // build the Java topology + def getTopologyJava(): TopologyDescription = { + + val streamBuilder = new StreamsBuilderJ + val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic) + + val lowered: KStreamJ[String, String] = textLines + .transform(new TransformerSupplier[String, String, KeyValue[String, String]] { + override def get(): Transformer[String, String, KeyValue[String, String]] = new Transformer[String, String, KeyValue[String, String]] { + override def init(context: ProcessorContext): Unit = Unit + + override def transform(key: String, value: String): KeyValue[String, String] = + new KeyValue(key, value.toLowerCase) + + override def close(): Unit = Unit + } + }) + + val grouped: KGroupedStreamJ[String, String] = lowered.groupBy { + new KeyValueMapper[String, String, String] { + def apply(k: String, v: String): String = v + } + } + + val wordCounts: KTableJ[String, java.lang.Long] = grouped.count() + + streamBuilder.build().describe() + } + + // should match + assertEquals(getTopologyScala(), getTopologyJava()) + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka-Streams-Scala DSL transform shares transformer instance > ------------------------------------------------------------- > > Key: KAFKA-7250 > URL: https://issues.apache.org/jira/browse/KAFKA-7250 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Michal > Assignee: Michal Dziemianko > Priority: Major > Labels: scala > > The new Kafka Streams Scala DSL provides transform function with following > signature > {{def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], > stateStoreNames: String*): KStream[K1, V1]}} > the provided 'transformer' (will refer to it as scala-transformer) instance > is than used to derive java Transformer instance and in turn a > TransformerSupplier that is passed to the underlying java DSL. However that > causes all the tasks to share the same instance of the scala-transformer. > This introduce all sort of issues. The simplest way to reproduce is to > implement simplest transformer of the following shape: > {{.transform(new Transformer[String, String, (String, String)] {}} > var context: ProcessorContext = _ > {{ def init(pc: ProcessorContext) = \{ context = pc}}} > {{ def transform(k: String, v: String): (String, String) = {}} > context.timestamp() > ... > {{ }}}{{})}} > the call to timestmap will die with exception "This should not happen as > timestamp() should only be called while a record is processed" due to record > context not being set - while the update of record context was actually > performed, but due to shared nature of the scala-transformer the local > reference to the processor context is pointing to the one of the last > initialized task rather than the current task. > The solution is to accept a function in following manner: > def transform[K1, V1](getTransformer: () => Transformer[K, V, (K1, V1)], > stateStoreNames: String*): KStream[K1, V1] > or TransformerSupplier - like the transformValues DSL function does. -- This message was sent by Atlassian JIRA (v7.6.3#76005)