Hi

The root cause is checkpoint error due to fail to send data to kafka during 
'preCommit'. The right solution is avoid to send data to kafka unsuccessfully 
which might be scope of Kafka.

If you cannot ensure the status of kafka with its client and no request for 
exactly once, you can pass FlinkKafkaProducer.Semantic.NONE to disable sending 
data during 'preCommit' when creating the kafka producer.

If you don't want job failed due to checkpoint error, you can increase the 
tolerableDeclinedCheckpointNumber:

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);

Best
Yun Tang
________________________________
From: jose farfan <josef...@gmail.com>
Sent: Wednesday, January 15, 2020 23:21
To: ouywl <ou...@139.com>
Cc: user <u...@flink.apache.org>; user-zh@flink.apache.org 
<user-zh@flink.apache.org>
Subject: Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The 
streamTask checkpoint error .

Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl <ou...@139.com<mailto:ou...@139.com>> wrote:
Hi all:
When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as 
log-1,code is::

input.addSink(
        new FlinkKafkaProducer<KafkaEvent>(
                parameterTool.getRequired("bootstrap.servers"),
                parameterTool.getRequired("output-topic"),
                new KafkaEventDeSchema()));

Log-1:
2020-01-09 09:13:44,476 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400.
2020-01-09 09:15:33,069 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
1 by task f643244ff791dbd3fbfb88bfafdf1872 of job 
d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ 
producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361).
2020-01-09 09:15:33,070 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was 
declined.
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 
ms has passed since batch creation
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
    at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:311)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
    ... 17 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 58 
record(s) for k8s-test-data-0:120018 ms has passed since batch creation
2020-01-09 09:15:33,074 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job producer data 
frequece (d8827b3f4165b6ba27c8b59c7aa1a400) switched from state RUNNING to 
FAILING.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1443)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1353)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:722)
    at 
org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Best,
Ouywl

回复