flink job进行cancel后kafka producer报错

2021-11-18 文章 yu...@kiscloud.net
Hi All:
   有一个flink的运行正常的job进行cancel后,flink的log里面打印了一些信息,请问这种异常如何排查根因?

TY-APP-DATA-REAL-COMPUTATION - [2021-11-18 22:10:32.465] - INFO  
[RecordComputeOperator -> Sink: wi-data-sink (3/10)] 
org.apache.flink.runtime.taskmanager.Task  - RecordComputeOperator ->
Sink: wi-data-sink (3/10) (b2166a7da829182804f4557a16eabc58) switched from 
CANCELING to CANCELED.
TY-APP-DATA-REAL-COMPUTATION - [2021-11-18 22:10:32.466] - INFO  
[RecordComputeOperator -> Sink: wi-data-sink (9/10)] 
org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka p
roducer with timeoutMillis = 9223372036854775807 ms.
TY-APP-DATA-REAL-COMPUTATION - [2021-11-18 22:10:32.467] - ERROR 
[RecordComputeOperator -> Sink: wi-data-sink (9/10)] 
org.apache.flink.streaming.runtime.tasks.StreamTask  - Error during di
sposal of stream operator.
org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to 
send data to Kafka: Failed to close kafka producer
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:1026)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:691)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException: Failed to close kafka 
producer
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1062)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.close(FlinkKafkaProducer.java:195)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:674)
... 8 common frames omitted
Caused by: java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1253)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031)
... 12 common frames omitted



flink Kafka producer执行出错

2021-04-30 文章 tanggen...@163.com
我在flink处理消费kafka的一个topic,将迟到数据通过侧流发送到一个新的topic,基本上是一直报这个错,每次提交checkpoint时都会报这个错,然后就会重启
还请指导一下,需要做些其它的设置吗
2021-04-30 17:00:51
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1282)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:794)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.sideOutput(WindowOperator.java:558)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:422)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.




tanggen...@163.com


flink sink到kafka,报错Failed to construct kafka producer

2021-01-21 文章 lp
flink1.11.2 
自定义source循环产生数据然后sink到kafka
采用application Mode部署作业到yarn,
jobmanager.log报错如下:(jobmanager和taskmanager的container都分配了,报错都是如下)

2021-01-21 10:52:17,742 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-21 10:52:17,742 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 1 tasks should be restarted to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0. 
2021-01-21 10:52:17,742 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
kafkaSink -- flink ???kafka??? (315f9a7b42afb08b4de1841a5b3c0f76)
switched from state RUNNING to RESTARTING.
2021-01-21 10:52:18,743 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
kafkaSink -- flink ???kafka??? (315f9a7b42afb08b4de1841a5b3c0f76)
switched from state RESTARTING to RUNNING.
2021-01-21 10:52:18,743 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from CREATED to SCHEDULED.
2021-01-21 10:52:18,743 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from SCHEDULED to DEPLOYING.
2021-01-21 10:52:18,744 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #229) to
container_1611044725922_0017_01_02 @ slave02 (dataPort=39278)
2021-01-21 10:52:18,748 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from DEPLOYING to RUNNING.
2021-01-21 10:52:18,753 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@75c6d62a.
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
~[quickstart-0.1.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:78)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecu

kafka producer ?????????????????????????????

2020-12-28 文章 ??????????
kafka source flink.partition-discovery.interval-millis 
??kafka sink 
partitionmap


int[] partitions = (int[])this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
this.topicPartitionsMap.put(targetTopic, partitions);
}

Can flink kafka producer dynamically discover new parititons after expansion??

2020-12-25 文章 ??????????
hi??I have a source Kafka and a sink Kafka??when the amount of data processing 
grows??I need to expand Kafka topic's partition number ,but I don't want 
to restart the job to take effect.
for source Kafka, I use flink.partition-discovery.interval-millis and it could 
consume the new parititon after Iexpand the Kafka topic's partition 
number.


but sink kafka don't work like this??


The flink kafka producer get topic's paritition list and cache in 
topicPartitionsMap as showed In Class FlinkKafkaProducer

Can flink kafka producer dynamically discover new parititons after expansion??

2020-12-25 文章 ??????????
hi??I have a source Kafka and a sink Kafka??when the amount of data processing 
grows??I need to expand Kafka topic's partition number ,but I don't want 
to restart the job to take effect.
for source Kafka, I use flink.partition-discovery.interval-millis and it could 
consume the new parititon after Iexpand the Kafka topic's partition 
number.


but sink kafka don't work like this??


The flink kafka producer get topic's paritition list and cache in 
topicPartitionsMap as showed In Class FlinkKafkaProducer

回复: 1.11 kafka producer 只往一个partition里写

2020-08-18 文章 cs
我看了下,你应该用的是 flink-connector-kafka_2.12 这个依赖包吧
这个的确会出现你说的情况,发到一个分区。
建议使用flink-connector-kafka-{kafka版本}
例如flink-connector-kafka-0.8






--原始邮件--
发件人:
"user-zh"   
 

?????? 1.11 kafka producer ????????partition????

2020-08-18 文章 cs
??flink1.11??




----
??: 
   "user-zh"



Re: 1.11 kafka producer 只往一个partition里写

2020-08-17 文章 x2009438
是的,我看了一下源码。

因为我用的是simplestringschema,不属于keyedschema。所以flinkfixedpartitioner的open方法被跳过了,没有初始化subtask
 index,所以全部是0%partition=0,于是都写到partition 0里去了。
我感觉怪怪的,不太合逻辑。

发自我的iPhone

>> 在 2020年8月17日,23:28,cs <58683...@qq.com> 写道:
> 目前Kafka 
> producer的partitioner使用的是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner这个类
> 具体的分区方法是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner#partition
> 由该方法可以得知你每个subtask发到哪个kafka的partition中。每个subtask的数据只会写到一个固定的partition里面。
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2020年8月17日(星期一) 晚上10:03
> 收件人:"user-zh" 
> 主题:1.11 kafka producer 只往一个partition里写
> 
> 
> 
> 我看1.11的kafka默认是用FlinkFixedPartitioner,应该是把sink每个subtask里的数据全部写到某一个partition吧?但是碰到了一个奇怪的事情:job整体并行度为5的情况下,数据结果只往partition
>  0里写。然后把partitoner用null替代之后是以roundrobin方式写入每一个partition。感到很奇怪啊。求助
> 
> 
> 
> 
> 发自我的iPhone
> 
> 
> 发自我的iPhone



??????1.11 kafka producer ????????partition????

2020-08-17 文章 Matrix42
FlinkFixedPartitioner??partitionsubTask??partition??partitoner??null??kafka??partitoner??roundrobin




----
??: "x2009438"

回复:1.11 kafka producer 只往一个partition里写

2020-08-17 文章 cs
目前Kafka 
producer的partitioner使用的是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner这个类
具体的分区方法是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner#partition
由该方法可以得知你每个subtask发到哪个kafka的partition中。每个subtask的数据只会写到一个固定的partition里面。




--原始邮件--
发件人:
"user-zh"   
 

1.11 kafka producer 只往一个partition里写

2020-08-17 文章 x2009438
我看1.11的kafka默认是用FlinkFixedPartitioner,应该是把sink每个subtask里的数据全部写到某一个partition吧?但是碰到了一个奇怪的事情:job整体并行度为5的情况下,数据结果只往partition
 0里写。然后把partitoner用null替代之后是以roundrobin方式写入每一个partition。感到很奇怪啊。求助




发自我的iPhone


发自我的iPhone