hi, ????FlinkKafkaConsumer???????????????? ???????????????????????? //--------------------------- val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment Env.setRestartStrategy(RestartStrategies.noRestart()) val consumerProps = new Properties() consumerProps.put("bootstrap.servers", brokers) consumerProps.put("group.id", "test1234")
val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest() Env.addSource(consumer).print() Env.execute()//-----------------------------------??????????????????????????????????????????topic??group.id????????????????topic????????????????????????????????????????????????????????????kafka ??consumer group????????????????????????????????????????????????????????????????????????KafkaConsumer??????????????????????????topic????????????????????????????????????????????????????flink1.11flink-connector-kafka_2.11 ????