Hi Mich, How did you setup your local Kafka cluster, did you produce any message to it? Seems like you are using a standard local Kafka cluster setup for testing: "bootstrap.servers", "localhost:9092" "zookeeper.connect", "localhost:2181"
so probably you need to manually produce some data, probably using kafka-console-producer [1] Another thing is since you are executing it in the scala shell, it might be easier for you to do val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .writeAsText("some_file_path") so that the produced result won't get buried in a huge list of console output messages. -- Rong [1]: https://kafka.apache.org/quickstart#quickstart_send On Sat, Jun 30, 2018 at 8:06 AM zhangminglei <18717838...@163.com> wrote: > Please try new FlinkKafkaConsumer09[String]("md", new > SimpleStringSchema(), properties).setStartFromEarliest() and try again. > > Cheers > Minglei. > > > 在 2018年6月30日,下午10:08,Mich Talebzadeh <mich.talebza...@gmail.com> 写道: > > > Hi, > > I have a streaming topic called "md" that displays test market data. > > I have written a simple program to stream data in via kafka into flinl. > > Flink version 1.5 > Kafka version 2.12 > > This is the sample program in scala that compiles ok > in start-scala-shell.sh > > import java.util.Properties > import java.util.Arrays > import org.apache.flink.api.common.functions.MapFunction > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.datastream.DataStream > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 > import org.apache.flink.streaming.util.serialization.SimpleStringSchema > import org.apache.flink.streaming.util.serialization.DeserializationSchema > //import org.apache.flink.streaming.api.scala._ > import org.apache.flink.streaming.util.serialization.SimpleStringSchema > object Main { > def main(args: Array[String]) { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val properties = new Properties() > properties.setProperty("bootstrap.servers", "localhost:9092") > properties.setProperty("zookeeper.connect", "localhost:2181") > properties.setProperty("group.id", "sampleScala") > val stream = env > .addSource(new FlinkKafkaConsumer09[String]("md", new > SimpleStringSchema(), properties)) > .print() > env.execute("Flink Kafka Example") > } > } > > warning: there was one deprecation warning; re-run with -deprecation for > details > defined object Main > > But I do not see any streaming output. > > A naïve question. How do I execute the above compiled object in this shell? > > Thanks > > Dr Mich Talebzadeh > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > http://talebzadehmich.wordpress.com > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > >