之前的代码好像乱码了,我设置了一下,重新发一下,建议你 
在获取consumer之后,再设置一下 consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka
 Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6



 /**
     * @param env
     * @param topic
     * @param time  订阅的时间
     * @return
     * @throws IllegalAccessException
     */
&nbsp; &nbsp; public static DataStreamSource<XlogStreamBasicBean&gt; 
buildSource(StreamExecutionEnvironment env, String topic, Long time) throws 
IllegalAccessException {
&nbsp; &nbsp; &nbsp; &nbsp; ParameterTool parameterTool = (ParameterTool) 
env.getConfig().getGlobalJobParameters();
&nbsp; &nbsp; &nbsp; &nbsp; Properties props = buildKafkaProps(parameterTool);
&nbsp; &nbsp; &nbsp; &nbsp; FlinkKafkaConsumer011<XlogStreamBasicBean&gt; 
consumer = new FlinkKafkaConsumer011<&gt;(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; topic,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new MetricSchema(),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; props);


&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;consumer.setStartFromLatest();


&nbsp; &nbsp; &nbsp; &nbsp; consumer.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<XlogStreamBasicBean&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long 
extractAscendingTimestamp(XlogStreamBasicBean element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (element == null || 
element.getTimestamp() == null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return 
System.currentTimeMillis();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return 
element.getTimestamp() - 10000;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });
&nbsp; &nbsp; &nbsp; &nbsp; return env.addSource(consumer);
&nbsp; &nbsp; }


}





------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"苟刚"<gougang_1...@163.com&gt;;
发送时间:&nbsp;2020年4月7日(星期二) 中午11:27
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;fink新增计算逻辑时kafka从头开始追平消费记录



Hello,
 
我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
 我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。


flink版本:1.6.3

部分代码如下:

public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
 StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);

 DataStreamSource<XlogStreamBasicBean&gt; data = KafkaTools.buildSource(env);
// 处理timing数据
processTimingData(parameterTool, data);
// 处理front error数据
processFrontErrorData(parameterTool, data);
// 处理img error数据
processImgLoadErrorData(parameterTool, data);
 env.execute("xlog compute");
}




kafka的连接参数配置:
public static Properties buildKafkaProps(ParameterTool parameterTool) {
 Properties props = parameterTool.getProperties();
 props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, 
DEFAULT_KAFKA_BROKERS));
 props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, 
DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
 props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, 
DEFAULT_KAFKA_GROUP_ID));
 props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
 props.put("auto.offset.reset", "latest");
return props;
}







--

Best Wishes
 Galen.K

回复