This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new 9ec64a8 KAFKA-7027: Add an overload build method in scala (#6373) 9ec64a8 is described below commit 9ec64a8a838f83bfc5e3011eec29104424e86679 Author: Massimo Siani <massimosi...@users.noreply.github.com> AuthorDate: Fri Mar 15 15:56:48 2019 +0100 KAFKA-7027: Add an overload build method in scala (#6373) The Java API can pass a Properties object to StreamsBuilder#build, to allow, e.g., topology optimization, while the Scala API does not yet. The latter only delegates the work to the underlying Java implementation. Reviewers: John Roesler <j...@confluent.io>, Bill Bejeck <bbej...@gmail.com> --- .../kafka/streams/scala/StreamsBuilder.scala | 11 ++ .../apache/kafka/streams/scala/TopologyTest.scala | 175 ++++++++++++++++++++- 2 files changed, 184 insertions(+), 2 deletions(-) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala index 8c5a9b3..4941859 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala @@ -19,6 +19,7 @@ */ package org.apache.kafka.streams.scala +import java.util.Properties import java.util.regex.Pattern import org.apache.kafka.streams.kstream.GlobalKTable @@ -178,4 +179,14 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier) def build(): Topology = inner.build() + + /** + * Returns the `Topology` that represents the specified processing logic and accepts + * a `Properties` instance used to indicate whether to optimize topology or not. + * + * @param props the `Properties` used for building possibly optimized topology + * @return the `Topology` that represents the specified processing logic + * @see `org.apache.kafka.streams.StreamsBuilder#build` + */ + def build(props: Properties): Topology = inner.build(props) } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index a826401..afa4ae6 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -19,20 +19,31 @@ */ package org.apache.kafka.streams.scala +import java.time.Duration +import java.util +import java.util.{Locale, Properties} import java.util.regex.Pattern +import org.apache.kafka.common.serialization.{Serdes => SerdesJ} import org.apache.kafka.streams.kstream.{ + Aggregator, + ForeachAction, + Initializer, + JoinWindows, KeyValueMapper, + Predicate, Reducer, Transformer, TransformerSupplier, ValueJoiner, ValueMapper, + Joined => JoinedJ, KGroupedStream => KGroupedStreamJ, KStream => KStreamJ, - KTable => KTableJ + KTable => KTableJ, + Materialized => MaterializedJ } -import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, ProcessorSupplier} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.Serdes._ import org.apache.kafka.streams.scala.kstream._ @@ -268,4 +279,164 @@ class TopologyTest extends JUnitSuite { // should match assertEquals(getTopologyScala(), getTopologyJava()) } + + @Test def shouldBuildIdenticalTopologyInJavaNScalaProperties(): Unit = { + + val props = new Properties() + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE) + + val propsNoOptimization = new Properties() + propsNoOptimization.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION) + + val AGGREGATION_TOPIC = "aggregationTopic" + val REDUCE_TOPIC = "reduceTopic" + val JOINED_TOPIC = "joinedTopic" + + // build the Scala topology + def getTopologyScala: StreamsBuilder = { + + val aggregator = (_: String, v: String, agg: Int) => agg + v.length + val reducer = (v1: String, v2: String) => v1 + ":" + v2 + val processorValueCollector: util.List[String] = new util.ArrayList[String] + + val builder: StreamsBuilder = new StreamsBuilder + + val sourceStream: KStream[String, String] = + builder.stream(inputTopic)(Consumed.`with`(Serdes.String, Serdes.String)) + + val mappedStream: KStream[String, String] = + sourceStream.map((k: String, v: String) => (k.toUpperCase(Locale.getDefault), v)) + mappedStream + .filter((k: String, _: String) => k == "B") + .mapValues((v: String) => v.toUpperCase(Locale.getDefault)) + .process(() => new SimpleProcessor(processorValueCollector)) + + val stream2 = mappedStream.groupByKey + .aggregate(0)(aggregator)(Materialized.`with`(Serdes.String, Serdes.Integer)) + .toStream + stream2.to(AGGREGATION_TOPIC)(Produced.`with`(Serdes.String, Serdes.Integer)) + + // adding operators for case where the repartition node is further downstream + val stream3 = mappedStream + .filter((_: String, _: String) => true) + .peek((k: String, v: String) => System.out.println(k + ":" + v)) + .groupByKey + .reduce(reducer)(Materialized.`with`(Serdes.String, Serdes.String)) + .toStream + stream3.to(REDUCE_TOPIC)(Produced.`with`(Serdes.String, Serdes.String)) + + mappedStream + .filter((k: String, _: String) => k == "A") + .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))( + Joined.`with`(Serdes.String, Serdes.String, Serdes.Integer) + ) + .to(JOINED_TOPIC) + + mappedStream + .filter((k: String, _: String) => k == "A") + .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))( + Joined.`with`(Serdes.String, Serdes.String, Serdes.String) + ) + .to(JOINED_TOPIC) + + builder + } + + // build the Java topology + def getTopologyJava: StreamsBuilderJ = { + + val keyValueMapper: KeyValueMapper[String, String, KeyValue[String, String]] = + new KeyValueMapper[String, String, KeyValue[String, String]] { + override def apply(key: String, value: String): KeyValue[String, String] = + KeyValue.pair(key.toUpperCase(Locale.getDefault), value) + } + val initializer: Initializer[Integer] = new Initializer[Integer] { + override def apply(): Integer = 0 + } + val aggregator: Aggregator[String, String, Integer] = new Aggregator[String, String, Integer] { + override def apply(key: String, value: String, aggregate: Integer): Integer = aggregate + value.length + } + val reducer: Reducer[String] = new Reducer[String] { + override def apply(v1: String, v2: String): String = v1 + ":" + v2 + } + val valueMapper: ValueMapper[String, String] = new ValueMapper[String, String] { + override def apply(v: String): String = v.toUpperCase(Locale.getDefault) + } + val processorValueCollector = new util.ArrayList[String] + val processorSupplier: ProcessorSupplier[String, String] = new ProcessorSupplier[String, String] { + override def get() = new SimpleProcessor(processorValueCollector) + } + val valueJoiner2: ValueJoiner[String, Integer, String] = new ValueJoiner[String, Integer, String] { + override def apply(value1: String, value2: Integer): String = value1 + ":" + value2.toString + } + val valueJoiner3: ValueJoiner[String, String, String] = new ValueJoiner[String, String, String] { + override def apply(value1: String, value2: String): String = value1 + ":" + value2.toString + } + + val builder = new StreamsBuilderJ + + val sourceStream = builder.stream(inputTopic, Consumed.`with`(Serdes.String, Serdes.String)) + + val mappedStream: KStreamJ[String, String] = + sourceStream.map(keyValueMapper) + mappedStream + .filter(new Predicate[String, String] { + override def test(key: String, value: String): Boolean = key == "B" + }) + .mapValues[String](valueMapper) + .process(processorSupplier) + + val stream2 = mappedStream.groupByKey + .aggregate(initializer, aggregator, MaterializedJ.`with`(Serdes.String, SerdesJ.Integer)) + .toStream + stream2.to(AGGREGATION_TOPIC, Produced.`with`(Serdes.String, SerdesJ.Integer)) + + // adding operators for case where the repartition node is further downstream + val stream3 = mappedStream + .filter(new Predicate[String, String] { + override def test(k: String, v: String) = true + }) + .peek(new ForeachAction[String, String] { + override def apply(k: String, v: String) = System.out.println(k + ":" + v) + }) + .groupByKey + .reduce(reducer, MaterializedJ.`with`(Serdes.String, Serdes.String)) + .toStream + stream3.to(REDUCE_TOPIC, Produced.`with`(Serdes.String, Serdes.String)) + + mappedStream + .filter(new Predicate[String, String] { + override def test(key: String, value: String): Boolean = key == "A" + }) + .join(stream2, + valueJoiner2, + JoinWindows.of(Duration.ofMillis(5000)), + JoinedJ.`with`(Serdes.String, Serdes.String, SerdesJ.Integer)) + .to(JOINED_TOPIC) + + mappedStream + .filter(new Predicate[String, String] { + override def test(key: String, value: String): Boolean = key == "A" + }) + .join(stream3, + valueJoiner3, + JoinWindows.of(Duration.ofMillis(5000)), + JoinedJ.`with`(Serdes.String, Serdes.String, SerdesJ.String)) + .to(JOINED_TOPIC) + + builder + } + + assertNotEquals(getTopologyScala.build(props).describe.toString, + getTopologyScala.build(propsNoOptimization).describe.toString) + assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString, + getTopologyJava.build(propsNoOptimization).describe.toString) + assertEquals(getTopologyScala.build(props).describe.toString, getTopologyJava.build(props).describe.toString) + } + + private class SimpleProcessor private[TopologyTest] (val valueList: util.List[String]) + extends AbstractProcessor[String, String] { + override def process(key: String, value: String): Unit = + valueList.add(value) + } }