退订

2022-03-25 Thread guoxb__...@sina.com

退订


guoxb__...@sina.com


退订

2022-02-09 Thread guoxb__...@sina.com
退订



guoxb__...@sina.com


flink任务日志告警:This does not compromise Flink's checkpoint integrity.

2021-04-20 Thread guoxb__...@sina.com
Hi all,
 
我这变flink任务日志发现,一直在报一个 告警,告警内容是:
   
 ```
  2021-04-21 09:13:07,218 WARN 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] - 
Committing offsets to Kafka takes longer than the checkpoint interval. 
Skipping commit of 
previous offsets because newer complete checkpoint offsets are available. This 
does not compromise Flink's checkpoint integrity.
```
导致作业消费kafka遇到问题,请问下,大家有谁遇到过这个问题么,如何排查,烦请给些建议,万分感谢

所使用的flink版本:1.11
程序中设置的checkpoint interval=30s



guoxb__...@sina.com


flink任务日志告警:This does not compromise Flink's checkpoint integrity.

2021-04-20 Thread guoxb__...@sina.com
Hi all,
 
我这变flink任务日志发现,一直在报一个 告警,告警内容是:
   
 ```
  2021-04-21 09:13:07,218 WARN 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] - 
Committing offsets to Kafka takes longer than the checkpoint interval. 
Skipping commit of 
previous offsets because newer complete checkpoint offsets are available. This 
does not compromise Flink's checkpoint integrity.
```
导致作业消费kafka遇到问题,请问下,大家有谁遇到过这个问题么,如何排查,烦请给些建议,万分感谢


guoxb__...@sina.com


flink消费kafka数据open()方法初始换多次的问题

2021-04-09 Thread guoxb__...@sina.com
hi:
   情景:
我在用flink通过FlinkKafkaConsumer消费kafka的数据并写入到mysql的时候,在sink端我继承 
RichSinkFunction ,并重写了open(),close()方法,以及实现了invoke(),方法
个人理解:
1. open()方法在程序启动的时候只走一次,我在该方法中初始化了数据库连接
2. close()方法在程序结束的时候也是只走一次
3. invoke()方法在获取到每一条数据走一次这个方法   
实际情况及问题(env.setParallelism(1)):
1. open()在程序启动的时候运行了两次
2. invoke()方法在每条消息过来也会被处理两次

code:
reader端:
```java
public class FlinkKafkaReader extends 
DataKafkaConnect {

@Override
protected DataStream reader(StreamExecutionEnvironment env, KafkaConfig 
cfg) throws JobException {

DataStream stream = null;
try {
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", 
cfg.getBootstrapServers());
kafkaProps.setProperty("group.id", cfg.getGroupId());
kafkaProps.setProperty("auto.offset.reset", cfg.getOffsetReset());
kafkaProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.setProperty("enable.auto.commit", cfg.getAutoCommit());
kafkaProps.setProperty("max.poll.interval.ms", cfg.getIntervalMs());

KafkaDeserializationSchema deserializationKdl = null;
// 根据不同的配置进行选择不同的消息解析器
switch (cfg.getMessageType()) {
case "mysql":
deserializationKdl = new 
KafkaDataDeserialiaztionBinlogSchemal();
break;
case "mongodb":
deserializationKdl = new 
KafkaDataDeserialiaztionOplogSchema();
break;
}
FlinkKafkaConsumer flinkKafkaConsumer = new 
FlinkKafkaConsumer(Arrays.asList(cfg.getTopics().split(",")), 
deserializationKdl, kafkaProps);


env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 如果设置了消费的开始offset时间,则从指定的时间点开始会消费,否则从当前时间点开始消费
String consumptionStartOffset = cfg.getConsumptionStartTime();
if (StringUtils.isBlank(consumptionStartOffset)) {
flinkKafkaConsumer.setStartFromGroupOffsets();
} else {
flinkKafkaConsumer.setStartFromTimestamp(
new SimpleDateFormat("-MM-dd HH:mm:ss")
.parse(cfg.getConsumptionStartTime())
.getTime()
);
}
// 设置并行度
env.setParallelism(1);
//env.getCheckpointConfig().setCheckpointInterval(1000 * 30);


env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置可容忍checkpoint失败的次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
stream = env.addSource(flinkKafkaConsumer)
.filter(new FilterFunction() {
@Override
public boolean filter(Object value) throws Exception {
return null != value;
}
});
} catch (Exception e) {
throw new JobException(e.getMessage());
}
return stream;
}
}
```
sink端:
```java
public class MysqlSink extends RichSinkFunction {
@Override
public void open(Configuration config) throw Exception{
...
}
@Override
public void close(){
...
}
@Override
public void invoke(Object obj,Context context){
    //业务逻辑,这里的逻辑每一条数据过来会运行两次,这里也是我的问题
...
}
}
```
 
还请知悉原因的道友给点指引,万分感谢


guoxb__...@sina.com


flink heartbeat timeout

2021-01-20 Thread guoxb__...@sina.com
Hi

问题描述:
 
我在使用flink进行流式计算任务,我的程序造昨晚上21点启动的,当时看是正常的,数据也是正常处理的,在今早9点时候查看,任务被自动重启了,查看日志,报错如下:


从报错上来看是由于超时时间引起的,查看资料,是需要调整该参数参数:heartbeat.timeout,官网文档支出默认值是5,但是这样以来的话,就需要重启flink服务了,这在我们生产上是不允许的。

问题:
1、该错误的原因目前只是经过猜测,还没有确定具体的问题,希望有经验的朋友指点一二,万分感谢
2、如果我真的需要设置heartbeat.timeout这个参数的话,如何在不通过重启flink集群的方式来实现,万分感谢
说明:
我的flink版本是:1.11.0


guoxb__...@sina.com


flink-cdc报错

2021-01-13 Thread guoxb__...@sina.com
HI:
大家好,我现在遇到了一个问题,flink在通过cdc的方式读取binlog的方式进行读取日志的时候报错,具体报错如下:


2021-01-13 15:45:21,920 INFO org.apache.kafka.connect.runtime.WorkerConfig 
[] - Worker configuration property 'internal.key.converter' is deprecated and 
may be removed in an upcoming release. The specified value 
'org.apache.kafka.connect.json.JsonConverter' matches the default, so this 
property can be safely removed from the worker configuration.
2021-01-13 15:45:21,920 INFO org.apache.kafka.connect.runtime.WorkerConfig [] - 
Worker configuration property 'internal.value.converter' is deprecated and may 
be removed in an upcoming release. The specified value 
'org.apache.kafka.connect.json.JsonConverter' matches the default, so this 
property can be safely removed from the worker configuration.
2021-01-13 15:45:28,936 ERROR io.debezium.connector.mysql.SnapshotReader [] - 
Failed due to error: Aborting snapshot due to error when last running 'UNLOCK 
TABLES': io.debezium.ddl.parser.mysql.generated.MySqlParser and 
io.debezium.ddl.parser.mysql.generated.MySqlParser$StringDataTypeContext 
disagree on InnerClasses attribute
org.apache.kafka.connect.errors.ConnectException: 
io.debezium.ddl.parser.mysql.generated.MySqlParser and 
io.debezium.ddl.parser.mysql.generated.MySqlParser$StringDataTypeContext 
disagree on InnerClasses attribute
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) 
~[yd-mysql-mysql.jar:?]
at 
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) 
~[yd-mysql-mysql.jar:?]
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) 
~[yd-mysql-mysql.jar:?]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_192]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_192]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]



不知有没有热心的朋友遇到类似的问题的,希望能够获取到一点建议,非常感谢


guoxb__...@sina.com