chenchuangchuang created FLINK-17638:
----------------------------------------
Summary: FlinkKafkaConsumerBase restore from empty state will be
set consum from earliest forced
Key: FLINK-17638
URL: https://issues.apache.org/jira/browse/FLINK-17638
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.10.0, 1.9.3, 1.9.0
Environment: Flink 1.9.0
kafka 1.1.0
jdk 1.8
Reporter: chenchuangchuang
my work target and data is like this :
# i need count the number of post per user create last 30 days in my system
# the total and realtime data is in MYSQL
# i can get increment MYSQL binlog from kafka-1.1.1 ( it just store the
last 7 days binlog), the topic name is "binlog_post_topic"
# so , i have to combine the MYSQL data and the binlog data
i do it in this way:
# first , i carry a snapshot of MYSQL data to kafka topic in order of
create_time ( topic name is "init-post-topic"), and consume from kafka topic
"init-post-topic" as flink data-stream with the SlidingEventTimeWindows
# second, after the task do all the data in the topic "init-post-topic" , i
create a save point for the task , call the save point save-point-a
# third, i modify my code ,
## the data source is "binlog_post_topic" topic of kafka ,
## other operotor will not change,
## and the "binlog_post_topic" is setted consuming from special timestamp
(when the snapshot of MYSQL create )
# forth, i restart my task from save-point-a
but i find the kafka consumer for the "binlog_post_topic" do not consume data
from the timestamp i setted, but from the earlist, i find the log in the task
manager
{code:java}
//代码占位符
2020-05-11 17:20:47,228 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer
subtask 0 restored state: {}.
...
2020-05-12 20:14:52,641 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer
subtask 0 will start reading 1 partitions with offsets in restored state:
{KafkaTopicPartition{topic='binlog-kk_social-post', partition=0}=-915623761775}
2020-05-11 17:20:47,414 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer
subtask 0 creating fetcher with offsets
{KafkaTopicPartition{topic='binlog-kk_social-post', partition=0}=-915623761775}.
{code}
i guess this may be caused by the FlinkKafkaConsumerBase
then i find code like this
in the method FlinkKafkaConsumerBase.initializeState()
{code:java}
//代码占位符
if (context.isRestored() && !restoredFromOldState) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
....{code}
this code mean that if a task is restart from the save point ,that
restoredState will not be null, at least be an empty TreeMap;
and in FlinkKafkaConsumerBase.open()
{code:java}
//代码占位符
if (restoredState != null) {
for (KafkaTopicPartition partition : allPartitions) {
if (!restoredState.containsKey(partition)) {
restoredState.put(partition,
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
}
{code}
in this place will init the consumer , if a task is restart from a save-point ,
restoredState at least is an empty TreeMap, then in this code , the consumer
will be setted consume from
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET
i change this code like this
{code:java}
//代码占位符
if (restoredState != null && !restoredState.isEmpty()) {
....
{code}
and this work well for me .
--
This message was sent by Atlassian Jira
(v8.3.4#803005)