Pardon - I missed the implicit config (which is used by withRunningKafka). Without your manual message production, was there any indication in broker log that it received message(s) ?
Thanks On Thu, Apr 19, 2018 at 6:31 AM, Chauvet, Thomas <thomas.chau...@ercom.fr> wrote: > Hi, > > > > withRunningKafka launch a kafka broker. This is one of the advantage of > this library. > > > > I test to consume / produce messages with kafka command line, and it seems > alright. > > > > Thanks > > > > *De :* Ted Yu [mailto:yuzhih...@gmail.com] > *Envoyé :* jeudi 19 avril 2018 15:28 > *À :* Chauvet, Thomas <thomas.chau...@ercom.fr> > *Objet :* Re: Flink / Kafka unit testing with scalatest-embedded-kafka > > > > Looking at your code, Kafka broker was not started. > > > > Was there running broker on localhost ? > > > > Cheers > > > > On Thu, Apr 19, 2018 at 6:23 AM, Chauvet, Thomas <thomas.chau...@ercom.fr> > wrote: > > Hi, > > > > I would like to « unit test » a job flink with Kafka as source (and Sink). > I am trying to use the library scalatest-embedded-kafka to simulate a Kafka > for my test. > > > > For example, I would like to get data (string stream) from Kafka, convert > it intro uppercase and put it into another topic. > > > > Now, I am just trying to use Flink’s kafka consumer to read into a topic > (with embedded kafka). > > > > Here is the code for example : > > > > ```scala > > > > import java.util.Properties > > > > import org.apache.flink.streaming.api.scala._ > > import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} > > import org.apache.flink.api.common.serialization.SimpleStringSchema > > import org.apache.flink.core.fs.FileSystem.WriteMode > > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 > > import org.scalatest.{Matchers, WordSpec} > > import scala.util.Random > > > > object SimpleFlinkKafkaTest { > > SimpleFlinkKafkaTest > > val kafkaPort = 9092 > > val zooKeeperPort = 2181 > > > > val groupId = Random.nextInt(1000000).toString > > val props = new Properties() > > props.put("bootstrap.servers", "localhost:9092") > > props.put("zookeeper.connect", "localhost:2181") > > props.put("auto.offset.reset", "earliest") > > props.put("group.id", groupId) > > props.put("key.deserializer", "org.apache.kafka.common.serialization. > StringDeserializer") > > props.put("value.deserializer", "org.apache.kafka.common.serialization. > StringDeserializer") > > > > val propsMap = Map( > > "bootstrap.servers" -> "localhost:9092", > > "zookeeper.connect" -> "localhost:2181", > > "auto.offset.reset" -> "earliest", > > "group.id" -> groupId, > > "key.deserializer" -> "org.apache.kafka.common.serialization. > StringDeserializer", > > "value.deserializer" -> "org.apache.kafka.common.serialization. > StringDeserializer" > > ) > > > > val inputString = "mystring" > > val expectedString = "MYSTRING" > > > > } > > > > class SimpleFlinkKafkaTest extends WordSpec with Matchers with > EmbeddedKafka { > > > > "runs with embedded kafka" should { > > > > "work" in { > > > > implicit val config = EmbeddedKafkaConfig( > > kafkaPort = SimpleFlinkKafkaTest.kafkaPort, > > zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort, > > customConsumerProperties = SimpleFlinkKafkaTest.propsMap > > ) > > > > withRunningKafka { > > > > publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest. > inputString) > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.setParallelism(1) > > > > val kafkaConsumer = new FlinkKafkaConsumer011( > > "input-topic", > > new SimpleStringSchema, > > SimpleFlinkKafkaTest.props > > ) > > > > val inputStream = env.addSource(kafkaConsumer) > > > > val outputStream = inputStream.map { msg => > > msg.toUpperCase > > } > > > > outputStream.writeAsText("test.csv", WriteMode.OVERWRITE) > > > > env.execute() > > > > consumeFirstStringMessageFrom("output-topic") shouldEqual > SimpleFlinkKafkaTest.expectedString > > > > } > > } > > } > > } > > ``` > > > > The flink process si running but nothing happen. I try ot write into a > text file to see any output but there is nothing into the file. > > > > Any idea ? Does anybody use this library to test a Flink Job using Kafka ? > > > > Thanks in advance, > > > > Thomas > > >