之前的代码好像乱码了,我设置了一下,重新发一下,建议你 在获取consumer之后,再设置一下 consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6
/** * @param env * @param topic * @param time 订阅的时间 * @return * @throws IllegalAccessException */ public static DataStreamSource<XlogStreamBasicBean> buildSource(StreamExecutionEnvironment env, String topic, Long time) throws IllegalAccessException { ParameterTool parameterTool = (ParameterTool) env.getConfig().getGlobalJobParameters(); Properties props = buildKafkaProps(parameterTool); FlinkKafkaConsumer011<XlogStreamBasicBean> consumer = new FlinkKafkaConsumer011<>( topic, new MetricSchema(), props); consumer.setStartFromLatest(); consumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<XlogStreamBasicBean>() { @Override public long extractAscendingTimestamp(XlogStreamBasicBean element) { if (element == null || element.getTimestamp() == null) { return System.currentTimeMillis(); } return element.getTimestamp() - 10000; } }); return env.addSource(consumer); } } ------------------ 原始邮件 ------------------ 发件人: "苟刚"<gougang_1...@163.com>; 发送时间: 2020年4月7日(星期二) 中午11:27 收件人: "user-zh"<user-zh@flink.apache.org>; 主题: fink新增计算逻辑时kafka从头开始追平消费记录 Hello, 我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗? 我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。 flink版本:1.6.3 部分代码如下: public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env); // 处理timing数据 processTimingData(parameterTool, data); // 处理front error数据 processFrontErrorData(parameterTool, data); // 处理img error数据 processImgLoadErrorData(parameterTool, data); env.execute("xlog compute"); } kafka的连接参数配置: public static Properties buildKafkaProps(ParameterTool parameterTool) { Properties props = parameterTool.getProperties(); props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS)); props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, DEFAULT_KAFKA_ZOOKEEPER_CONNECT)); props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID)); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); return props; } -- Best Wishes Galen.K