[ 
https://issues.apache.org/jira/browse/KAFKA-7250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16571800#comment-16571800
 ] 

ASF GitHub Bot commented on KAFKA-7250:
---------------------------------------

guozhangwang closed pull request #5468: KAFKA-7250: fix transform function in 
scala DSL to accept TranformerS…
URL: https://github.com/apache/kafka/pull/5468
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
index 4a4c3b0ee44..65ea4903326 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
@@ -105,4 +105,10 @@ object FunctionConversions {
       override def apply(): VA = f()
     }
   }
+
+  implicit class TransformerSupplierFromFunction[K, V, VO](val f: () => 
Transformer[K, V, VO]) extends AnyVal {
+    def asTransformerSupplier: TransformerSupplier[K, V, VO] = new 
TransformerSupplier[K, V, VO] {
+      override def get(): Transformer[K, V, VO] = f()
+    }
+  }
 }
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 8f6aab86e2e..c02939aeab7 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -22,7 +22,7 @@ package kstream
 
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
-import org.apache.kafka.streams.processor.{Processor, ProcessorContext, 
ProcessorSupplier, TopicNameExtractor}
+import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, 
TopicNameExtractor}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
@@ -284,33 +284,21 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   /**
    * Transform each record of the input stream into zero or more records in 
the output stream (both key and value type
    * can be altered arbitrarily).
-   * A `Transformer` is applied to each input record and computes zero or more 
output records. In order to assign a
-   * state, the state must be created and registered beforehand via stores 
added via `addStateStore` or `addGlobalStore`
+   * A `Transformer` (provided by the given `TransformerSupplier`) is applied 
to each input record
+   * and computes zero or more output records.
+   * In order to assign a state, the state must be created and registered
+   * beforehand via stores added via `addStateStore` or `addGlobalStore`
    * before they can be connected to the `Transformer`
    *
-   * @param transformer the `Transformer` instance
+   * @param transformerSupplier the `TransformerSuplier` that generates 
`Transformer`
    * @param stateStoreNames     the names of the state stores used by the 
processor
    * @return a [[KStream]] that contains more or less records with new key and 
value (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transform`
    */
-  def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], 
stateStoreNames: String*): KStream[K1, V1] = {
-    val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] =
-      new TransformerSupplier[K, V, KeyValue[K1, V1]] {
-        override def get(): Transformer[K, V, KeyValue[K1, V1]] =
-          new Transformer[K, V, KeyValue[K1, V1]] {
-            override def transform(key: K, value: V): KeyValue[K1, V1] =
-              transformer.transform(key, value) match {
-                case (k1, v1) => KeyValue.pair(k1, v1)
-                case _        => null
-              }
-
-            override def init(context: ProcessorContext): Unit = 
transformer.init(context)
-
-            override def close(): Unit = transformer.close()
-          }
-      }
-    inner.transform(transformerSupplierJ, stateStoreNames: _*)
-  }
+  def transform[K1, V1](transformerSupplier: () => Transformer[K, V, 
KeyValue[K1, V1]],
+                        stateStoreNames: String*): KStream[K1, V1] =
+    inner.transform(transformerSupplier.asTransformerSupplier, 
stateStoreNames: _*)
+
 
   /**
    * Transform the value of each input record into a new value (with possible 
new type) of the output record.
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index f04ec5dcb04..194abf5e3ba 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -31,6 +31,8 @@ import ImplicitConversions._
 
 import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
 import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => 
KStreamJ, KGroupedStream => KGroupedStreamJ, _}
+import org.apache.kafka.streams.processor.ProcessorContext
+
 import collection.JavaConverters._
 
 /**
@@ -194,4 +196,61 @@ class TopologyTest extends JUnitSuite {
     // should match
     assertEquals(getTopologyScala(), getTopologyJava())
   }
+
+  @Test def shouldBuildIdenticalTopologyInJavaNScalaTransform() = {
+
+    // build the Scala topology
+    def getTopologyScala(): TopologyDescription = {
+
+      import Serdes._
+
+      val streamBuilder = new StreamsBuilder
+      val textLines = streamBuilder.stream[String, String](inputTopic)
+
+      val _: KTable[String, Long] =
+        textLines
+          .transform(() => new Transformer[String, String, KeyValue[String, 
String]] {
+              override def init(context: ProcessorContext): Unit = Unit
+              override def transform(key: String, value: String): 
KeyValue[String, String] =
+                new KeyValue(key, value.toLowerCase)
+              override def close(): Unit = Unit
+          })
+          .groupBy((k, v) => v)
+          .count()
+
+      streamBuilder.build().describe()
+    }
+
+    // build the Java topology
+    def getTopologyJava(): TopologyDescription = {
+
+      val streamBuilder = new StreamsBuilderJ
+      val textLines: KStreamJ[String, String] = streamBuilder.stream[String, 
String](inputTopic)
+
+      val lowered: KStreamJ[String, String] = textLines
+        .transform(new TransformerSupplier[String, String, KeyValue[String, 
String]] {
+        override def get(): Transformer[String, String, KeyValue[String, 
String]] = new Transformer[String, String, KeyValue[String, String]] {
+          override def init(context: ProcessorContext): Unit = Unit
+
+          override def transform(key: String, value: String): KeyValue[String, 
String] =
+            new KeyValue(key, value.toLowerCase)
+
+          override def close(): Unit = Unit
+        }
+      })
+
+      val grouped: KGroupedStreamJ[String, String] = lowered.groupBy {
+        new KeyValueMapper[String, String, String] {
+          def apply(k: String, v: String): String = v
+        }
+      }
+
+      val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
+
+      streamBuilder.build().describe()
+    }
+
+    // should match
+    assertEquals(getTopologyScala(), getTopologyJava())
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka-Streams-Scala DSL transform shares transformer instance
> -------------------------------------------------------------
>
>                 Key: KAFKA-7250
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7250
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Michal
>            Assignee: Michal Dziemianko
>            Priority: Major
>              Labels: scala
>
> The new Kafka Streams Scala DSL provides transform function with following 
> signature
> {{def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], 
> stateStoreNames: String*): KStream[K1, V1]}}
> the provided 'transformer' (will refer to it as scala-transformer)  instance 
> is than used to derive java Transformer instance and in turn a 
> TransformerSupplier that is passed to the underlying java DSL. However that 
> causes all the tasks to share the same instance of the scala-transformer. 
> This introduce all sort of issues. The simplest way to reproduce is to 
> implement simplest transformer of the following shape:
> {{.transform(new Transformer[String, String, (String, String)] {}}
>     var context: ProcessorContext = _
> {{  def init(pc: ProcessorContext) = \{ context = pc}}}
> {{  def transform(k: String, v: String): (String, String) = {}}
>         context.timestamp() 
>         ...
> {{  }}}{{})}}
> the call to timestmap will die with exception "This should not happen as 
> timestamp() should only be called while a record is processed" due to record 
> context not being set - while the update of record context was actually 
> performed, but due to shared nature of the scala-transformer the local 
> reference to the processor context is pointing to the one of the last 
> initialized task rather than the current task. 
> The solution is to accept a function in following manner: 
> def transform[K1, V1](getTransformer: () => Transformer[K, V, (K1, V1)], 
> stateStoreNames: String*): KStream[K1, V1]
>  or TransformerSupplier - like the transformValues DSL function does.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to