Hi Mich,

How did you setup your local Kafka cluster, did you produce any message to
it? Seems like you are using a standard local Kafka cluster setup for
testing:
"bootstrap.servers", "localhost:9092" "zookeeper.connect", "localhost:2181"

so probably you need to manually produce some data, probably
using kafka-console-producer [1]

Another thing is since you are executing it in the scala shell, it might be
easier for you to do
    val stream = env
                 .addSource(new FlinkKafkaConsumer09[String]("md", new
SimpleStringSchema(), properties))
                 .writeAsText("some_file_path")
so that the produced result won't get buried in a huge list of console
output messages.

--
Rong

[1]: https://kafka.apache.org/quickstart#quickstart_send

On Sat, Jun 30, 2018 at 8:06 AM zhangminglei <18717838...@163.com> wrote:

> Please try new FlinkKafkaConsumer09[String]("md", new
> SimpleStringSchema(), properties).setStartFromEarliest() and try again.
>
> Cheers
> Minglei.
>
>
> 在 2018年6月30日,下午10:08,Mich Talebzadeh <mich.talebza...@gmail.com> 写道:
>
>
> Hi,
>
> I have a streaming topic called "md" that displays test market data.
>
> I have written a simple program to stream data in via kafka into flinl.
>
> Flink version 1.5
> Kafka version 2.12
>
> This is the sample program in scala that compiles ok
> in start-scala-shell.sh
>
> import java.util.Properties
> import java.util.Arrays
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.datastream.DataStream
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.streaming.util.serialization.DeserializationSchema
> //import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> object Main {
>   def main(args: Array[String]) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val properties = new Properties()
>     properties.setProperty("bootstrap.servers", "localhost:9092")
>     properties.setProperty("zookeeper.connect", "localhost:2181")
>     properties.setProperty("group.id", "sampleScala")
>     val stream = env
>                  .addSource(new FlinkKafkaConsumer09[String]("md", new
> SimpleStringSchema(), properties))
>                  .print()
>     env.execute("Flink Kafka Example")
>   }
> }
>
> warning: there was one deprecation warning; re-run with -deprecation for
> details
> defined object Main
>
> But I do not see any streaming output.
>
> A naïve question. How do I execute the above compiled object in this shell?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>

Reply via email to