Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl <ou...@139.com> wrote:

> Hi all:
> When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was
> happen as* log-1,code is::*
>
> input.addSink(
>         new FlinkKafkaProducer<KafkaEvent>(
>                 parameterTool.getRequired("bootstrap.servers"),
>                 parameterTool.getRequired("output-topic"),
>                 new KafkaEventDeSchema()));
>
>
> *Log-1:*
> 2020-01-09 09:13:44,476 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Triggering checkpoint 1 @ 1578561224466 for job
> d8827b3f4165b6ba27c8b59c7aa1a400.
> 2020-01-09 09:15:33,069 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Decline checkpoint 1 by task
> f643244ff791dbd3fbfb88bfafdf1872 of job d8827b3f4165b6ba27c8b59c7aa1a400
> at ee8e6d8e92f9a59f578b1de2edd73537 @ producedata-taskmanager-d59d5cb7c-pv27j
> (dataPort=33361).
> 2020-01-09 09:15:33,070 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Discarding checkpoint 1 of job
> d8827b3f4165b6ba27c8b59c7aa1a400.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Sink: Unnamed (1/2). Failure reason:
> Checkpoint was declined.
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .snapshotState(AbstractStreamOperator.java:431)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask
> .java:1282)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:
> 1216)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .checkpointState(StreamTask.java:872)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performCheckpoint(StreamTask.java:777)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpointOnBarrier(StreamTask.java:708)
>     at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
> .notifyCheckpoint(CheckpointBarrierHandler.java:88)
>     at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner
> .processBarrier(CheckpointBarrierAligner.java:113)
>     at org.apache.flink.streaming.runtime.io.CheckpointedInputGate
> .pollNext(CheckpointedInputGate.java:155)
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .pollNextNullable(StreamTaskNetworkInput.java:102)
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .pollNextNullable(StreamTaskNetworkInput.java:47)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:135)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:279)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:301)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:406)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:
> 120018 ms has passed since batch creation
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .checkErroneous(FlinkKafkaProducer.java:1196)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .flush(FlinkKafkaProducer.java:968)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .preCommit(FlinkKafkaProducer.java:892)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .preCommit(FlinkKafkaProducer.java:98)
>     at org.apache.flink.streaming.api.functions.sink.
> TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:
> 311)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .snapshotState(FlinkKafkaProducer.java:973)
>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .snapshotFunctionState(StreamingFunctionUtils.java:99)
>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> .snapshotState(AbstractUdfStreamOperator.java:90)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .snapshotState(AbstractStreamOperator.java:399)
>     ... 17 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 58
> record(s) for k8s-test-data-0:120018 ms has passed since batch creation
> 2020-01-09 09:15:33,074 INFO org.apache.flink.runtime.executiongraph.
> ExecutionGraph - Job producer data frequece
> (d8827b3f4165b6ba27c8b59c7aa1a400) switched from state RUNNING to FAILING.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
> tolerable failure threshold.
>     at org.apache.flink.runtime.checkpoint.CheckpointFailureManager
> .handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
>     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> .failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1443)
>     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> .discardCheckpoint(CheckpointCoordinator.java:1353)
>     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> .receiveDeclineMessage(CheckpointCoordinator.java:722)
>     at org.apache.flink.runtime.scheduler.LegacyScheduler
> .lambda$declineCheckpoint$2(LegacyScheduler.java:573)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
> 511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.
> ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(
> ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.
> ScheduledThreadPoolExecutor$ScheduledFutureTask.run(
> ScheduledThreadPoolExecutor.java:293)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
>
> Best,
> Ouywl
>
>

Reply via email to