Hi Oleg

If your can only view the log of "snapshotState fetcher: partition: 
KafkaTopicPartition{topic\u003d\u0027dsp-producer-z-clickstream-web-raw\u0027, 
partition\u003d2} offset:1091528771\n" when the checkpoint of that subtask has 
reached to 1GB+. This is really weird, as the state in unionOffsetStates should 
be only one record.

Please follow the thread Jacob mentioned, use 
Checkpoints.loadCheckpointMetadata to load the _metadata to see how many 
records in the offsets meta. BTW, could you also share the code of how to 
create the "dsp-producer-z-clickstream-web-raw" source?

Best
Yun Tang


________________________________
From: Jacob Sevart <jsev...@uber.com>
Sent: Saturday, April 18, 2020 9:22
To: Oleg Vysotsky <ol...@zillow.com>
Cc: Timo Walther <twal...@apache.org>; user@flink.apache.org 
<user@flink.apache.org>; Long Nguyen <lo...@zillow.com>
Subject: Re: Checkpoints for kafka source sometimes get 55 GB size (instead of 
2 MB) and flink job fails during restoring from such checkpoint

This sounds a lot like an issue I just went through 
(http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Very-large-metadata-file-td33356.html).
 Are you using a union list state anywhere?

You could also use the debugging steps mentioned in that thread to inspect the 
contents of the bad checkpoint.

On Fri, Apr 17, 2020 at 4:46 PM Oleg Vysotsky 
<ol...@zillow.com<mailto:ol...@zillow.com>> wrote:
Hi Timo,

Thank you for the suggestion!

Last two days I tried to find the clear way to reproduce the problem and to 
define the root cause. The problem with "abnormal" checkpoints happened only on 
our largest flink job (which processes 6k-10k events per second). Similar 
smallerjobs (same code) don't have this problem. E.g. the similar job which 
processes about 3 times less events don't have this problem.  As a result, 
remote debugging is quite challenging. Instead of debugging I added logging to 
FlinkKafkaConsumerBase#snapshotState and set commitOffsetsOnCheckpoints to 
false to disable "additional" logic in FlinkKafkaConsumerBase#snapshotState 
(please check my temp "log" changes below). The logging was as expected like  
{"lvl":"WARN","msg":"snapshotState fetcher: partition: 
KafkaTopicPartition{topic\u003d\u0027dsp-producer-z-clickstream-web-raw\u0027, 
partition\u003d2} offset:1091528771\n"} I didn't find any example of large 
entry added to "unionOffsetStates"

Looks like the problem is that the flink job periodically (not often) creates 
continues set of "bad" checkpoints, which have reasonable "checkpoint" size for 
each operator. After restoring from such "bad" checkpoint the flink job starts 
creating "abnormal" checkpoint which includes 55 Gb  for kafka source operator 
(please check the attachments, "Source: dsp-producer-z-clickstream-raw" is 
kafka source). Creating "abnormal" checkpoint is 100% reproducible in this 
case.  Just in case, we just switched to use kafka source instead of kinesis 
source. We have the same job with kinesis for 1+ year and didn't have this 
problem. Any advices are appreciated.


        @Override
        public final void snapshotState(FunctionSnapshotContext context) throws 
Exception {
                if (!running) {
                        LOG.debug("snapshotState() called on closed source");
                } else {
                        unionOffsetStates.clear();

                        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
                        StringBuilder sb = new StringBuilder("snapshotState ");
                        if (fetcher == null) {
                                sb.append("null fetcher: ");
                                // the fetcher has not yet been initialized, 
which means we need to return the
                                // originally restored offsets or the assigned 
partitions
                                for (Map.Entry<KafkaTopicPartition, Long> 
subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                                        
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), 
subscribedPartition.getValue()));
                                        sb.append("partition: 
").append(subscribedPartition.getKey()).append(" 
offset:").append(subscribedPartition.getValue()).append('\n');
                                }

                                if (offsetCommitMode == 
OffsetCommitMode.ON_CHECKPOINTS) {
                                        // the map cannot be asynchronously 
updated, because only one checkpoint call can happen
                                        // on this function at a time: either 
snapshotState() or notifyCheckpointComplete()
                                        
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                                }

                        } else {
                                HashMap<KafkaTopicPartition, Long> 
currentOffsets = fetcher.snapshotCurrentState();

                                if (offsetCommitMode == 
OffsetCommitMode.ON_CHECKPOINTS) {
                                        // the map cannot be asynchronously 
updated, because only one checkpoint call can happen
                                        // on this function at a time: either 
snapshotState() or notifyCheckpointComplete()
                                        
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                                }
                                sb.append("fetcher: ");
                                for (Map.Entry<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                                        unionOffsetStates.add(
                                                        
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), 
kafkaTopicPartitionLongEntry.getValue()));
                                        sb.append("partition: 
").append(kafkaTopicPartitionLongEntry.getKey()).append(" 
offset:").append(kafkaTopicPartitionLongEntry.getValue()).append('\n');
                                }

                        }

                        if (offsetCommitMode == 
OffsetCommitMode.ON_CHECKPOINTS) {
                                // truncate the map of pending offsets to 
commit, to prevent infinite growth
                                while (pendingOffsetsToCommit.size() > 
MAX_NUM_PENDING_CHECKPOINTS) {
                                        pendingOffsetsToCommit.remove(0);
                                }
                        }
                        LOG.warn(sb.toString());
                }
        }



On 4/14/20, 11:44 PM, "Timo Walther" 
<twal...@apache.org<mailto:twal...@apache.org>> wrote:

    Hi Oleg,

    this sounds indeed like abnormal behavior. Are you sure that these large
    checkpoints are related to the Kafka consumer only? Are there other
    operators in the pipeline? Because internally the state kept in a Kafka
    consumer is pretty minimal and only related to Kafka partition and
    offset management.

    If you are sure that the Kafka consumer must produce such a state size,
    I would recommend to use a remote debugger and check what is
    checkpointed in the corresponding `FlinkKafkaConsumerBase#snapshotState`.

    Regards,
    Timo


    On 15.04.20 03:37, Oleg Vysotsky wrote:
    > Hello,
    >
    > Sometime our flink job starts creating large checkpoints which include
    > 55 Gb (instead of 2 MB) related to kafka source. After the flink job
    > creates first “abnormal” checkpoint all next checkpoints are “abnormal”
    > as well. Flink job can’t be restored from such checkpoint. Restoring
    > from the checkpoint hangs/fails. Also flnk dashboard hangs and flink
    > cluster crashs during the restoring from such checkpoint.  We  didn’t
    > catch related error message.  Also we don’t find clear way to reproduce
    > this problem (when the flink job creates “abnormal” checkpoints).
    >
    > Configuration:
    >
    > We are using flink 1.8.1 on emr (emr 5.27)
    >
    > Kafka: confluence kafka 5.4.1
    >
    > Flink kafka connector:
    >   org.apache.flink:flink-connector-kafka_2.11:1.8.1 (it includes
    > org.apache.kafka:kafka-clients:2.0.1 dependencies)
    >
    > Our input kafka topic has 32 partitions and related flink source has 32
    > parallelism
    >
    > We use pretty much all default flink kafka concumer setting. We only
    > specified:
    >
    > CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
    >
    > ConsumerConfig.GROUP_ID_CONFIG,
    >
    > CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
    >
    > Thanks a lot  in advance!
    >
    > Oleg
    >




--
Jacob Sevart
Software Engineer, Safety

Reply via email to