hi,    ????FlinkKafkaConsumer????????????????  
  ????????????????????????
//---------------------------
 val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
 Env.setRestartStrategy(RestartStrategies.noRestart())
 val consumerProps = new Properties()
 consumerProps.put("bootstrap.servers", brokers)
 consumerProps.put("group.id", "test1234")

 val consumer = new FlinkKafkaConsumer[String](topic,new 
KafkaStringSchema,consumerProps).setStartFromLatest()
 Env.addSource(consumer).print()
 
Env.execute()//-----------------------------------??????????????????????????????????????????topic??group.id????????????????topic????????????????????????????????????????????????????????????kafka
 ??consumer 
group????????????????????????????????????????????????????????????????????????KafkaConsumer??????????????????????????topic????????????????????????????????????????????????????flink1.11flink-connector-kafka_2.11
 ????

回复