flink job进行cancel后kafka producer报错
Hi All: 有一个flink的运行正常的job进行cancel后,flink的log里面打印了一些信息,请问这种异常如何排查根因? TY-APP-DATA-REAL-COMPUTATION - [2021-11-18 22:10:32.465] - INFO [RecordComputeOperator -> Sink: wi-data-sink (3/10)] org.apache.flink.runtime.taskmanager.Task - RecordComputeOperator -> Sink: wi-data-sink (3/10) (b2166a7da829182804f4557a16eabc58) switched from CANCELING to CANCELED. TY-APP-DATA-REAL-COMPUTATION - [2021-11-18 22:10:32.466] - INFO [RecordComputeOperator -> Sink: wi-data-sink (9/10)] org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka p roducer with timeoutMillis = 9223372036854775807 ms. TY-APP-DATA-REAL-COMPUTATION - [2021-11-18 22:10:32.467] - ERROR [RecordComputeOperator -> Sink: wi-data-sink (9/10)] org.apache.flink.streaming.runtime.tasks.StreamTask - Error during di sposal of stream operator. org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: Failed to close kafka producer at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:1026) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:691) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: Failed to close kafka producer at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1062) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.close(FlinkKafkaProducer.java:195) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:674) ... 8 common frames omitted Caused by: java.lang.InterruptedException: null at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1253) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) ... 12 common frames omitted
flink Kafka producer执行出错
我在flink处理消费kafka的一个topic,将迟到数据通过侧流发送到一个新的topic,基本上是一直报这个错,每次提交checkpoint时都会报这个错,然后就会重启 还请指导一下,需要做些其它的设置吗 2021-04-30 17:00:51 org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1282) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:794) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.sideOutput(WindowOperator.java:558) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:422) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. tanggen...@163.com
flink sink到kafka,报错Failed to construct kafka producer
flink1.11.2 自定义source循环产生数据然后sink到kafka 采用application Mode部署作业到yarn, jobmanager.log报错如下:(jobmanager和taskmanager的container都分配了,报错都是如下) 2021-01-21 10:52:17,742 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2021-01-21 10:52:17,742 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2021-01-21 10:52:17,742 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job kafkaSink -- flink ???kafka??? (315f9a7b42afb08b4de1841a5b3c0f76) switched from state RUNNING to RESTARTING. 2021-01-21 10:52:18,743 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job kafkaSink -- flink ???kafka??? (315f9a7b42afb08b4de1841a5b3c0f76) switched from state RESTARTING to RUNNING. 2021-01-21 10:52:18,743 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250) switched from CREATED to SCHEDULED. 2021-01-21 10:52:18,743 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250) switched from SCHEDULED to DEPLOYING. 2021-01-21 10:52:18,744 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: Custom Source -> Sink: Unnamed (1/1) (attempt #229) to container_1611044725922_0017_01_02 @ slave02 (dataPort=39278) 2021-01-21 10:52:18,748 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250) switched from DEPLOYING to RUNNING. 2021-01-21 10:52:18,753 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@75c6d62a. org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432) ~[quickstart-0.1.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:78) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecu
kafka producer ?????????????????????????????
kafka source flink.partition-discovery.interval-millis ??kafka sink partitionmap int[] partitions = (int[])this.topicPartitionsMap.get(targetTopic); if (null == partitions) { partitions = getPartitionsByTopic(targetTopic, transaction.producer); this.topicPartitionsMap.put(targetTopic, partitions); }
Can flink kafka producer dynamically discover new parititons after expansion??
hi??I have a source Kafka and a sink Kafka??when the amount of data processing grows??I need to expand Kafka topic's partition number ,but I don't want to restart the job to take effect. for source Kafka, I use flink.partition-discovery.interval-millis and it could consume the new parititon after Iexpand the Kafka topic's partition number. but sink kafka don't work like this?? The flink kafka producer get topic's paritition list and cache in topicPartitionsMap as showed In Class FlinkKafkaProducer
Can flink kafka producer dynamically discover new parititons after expansion??
hi??I have a source Kafka and a sink Kafka??when the amount of data processing grows??I need to expand Kafka topic's partition number ,but I don't want to restart the job to take effect. for source Kafka, I use flink.partition-discovery.interval-millis and it could consume the new parititon after Iexpand the Kafka topic's partition number. but sink kafka don't work like this?? The flink kafka producer get topic's paritition list and cache in topicPartitionsMap as showed In Class FlinkKafkaProducer
回复: 1.11 kafka producer 只往一个partition里写
我看了下,你应该用的是 flink-connector-kafka_2.12 这个依赖包吧 这个的确会出现你说的情况,发到一个分区。 建议使用flink-connector-kafka-{kafka版本} 例如flink-connector-kafka-0.8 --原始邮件-- 发件人: "user-zh"
?????? 1.11 kafka producer ????????partition????
??flink1.11?? ---- ??: "user-zh"
Re: 1.11 kafka producer 只往一个partition里写
是的,我看了一下源码。 因为我用的是simplestringschema,不属于keyedschema。所以flinkfixedpartitioner的open方法被跳过了,没有初始化subtask index,所以全部是0%partition=0,于是都写到partition 0里去了。 我感觉怪怪的,不太合逻辑。 发自我的iPhone >> 在 2020年8月17日,23:28,cs <58683...@qq.com> 写道: > 目前Kafka > producer的partitioner使用的是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner这个类 > 具体的分区方法是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner#partition > 由该方法可以得知你每个subtask发到哪个kafka的partition中。每个subtask的数据只会写到一个固定的partition里面。 > > > > > --原始邮件-- > 发件人: > "user-zh" > > 发送时间:2020年8月17日(星期一) 晚上10:03 > 收件人:"user-zh" > 主题:1.11 kafka producer 只往一个partition里写 > > > > 我看1.11的kafka默认是用FlinkFixedPartitioner,应该是把sink每个subtask里的数据全部写到某一个partition吧?但是碰到了一个奇怪的事情:job整体并行度为5的情况下,数据结果只往partition > 0里写。然后把partitoner用null替代之后是以roundrobin方式写入每一个partition。感到很奇怪啊。求助 > > > > > 发自我的iPhone > > > 发自我的iPhone
??????1.11 kafka producer ????????partition????
FlinkFixedPartitioner??partitionsubTask??partition??partitoner??null??kafka??partitoner??roundrobin ---- ??: "x2009438"
回复:1.11 kafka producer 只往一个partition里写
目前Kafka producer的partitioner使用的是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner这个类 具体的分区方法是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner#partition 由该方法可以得知你每个subtask发到哪个kafka的partition中。每个subtask的数据只会写到一个固定的partition里面。 --原始邮件-- 发件人: "user-zh"
1.11 kafka producer 只往一个partition里写
我看1.11的kafka默认是用FlinkFixedPartitioner,应该是把sink每个subtask里的数据全部写到某一个partition吧?但是碰到了一个奇怪的事情:job整体并行度为5的情况下,数据结果只往partition 0里写。然后把partitoner用null替代之后是以roundrobin方式写入每一个partition。感到很奇怪啊。求助 发自我的iPhone 发自我的iPhone