[ https://issues.apache.org/jira/browse/FLINK-23674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400072#comment-17400072 ]
Arvid Heise edited comment on FLINK-23674 at 8/17/21, 5:30 AM: --------------------------------------------------------------- Of course {noformat} [root@kafka3 flink-1.13.1]# bin/flink run -n -c com.shanjiancaofu.live.job.ChargeJob -s file:/soft/opt/checkpoint/0d294a9fbed5da53390d048db00d557d/chk-44/_metadata ./ad-live-process-0.11-jar-with-dependencies.jar[root@kafka3 flink-1.13.1]# bin/flink run -n -c com.shanjiancaofu.live.job.ChargeJob -s file:/soft/opt/checkpoint/0d294a9fbed5da53390d048db00d557d/chk-44/_metadata ./ad-live-process-0.11-jar-with-dependencies.jar . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: Job has been submitted with JobID a5859d58669b2ad5ec3798086526e64e------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: a5859d58669b2ad5ec3798086526e64e) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: a5859d58669b2ad5ec3798086526e64e) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) at com.shanjiancaofu.live.job.ChargeJob.main(ChargeJob.java:155) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 8 moreCaused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: a5859d58669b2ad5ec3798086526e64e) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$24(RestClusterClient.java:670) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123) ... 24 moreCaused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=300000,backoffTimeMS=10000,maxFailuresPerInterval=1) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) at sun.reflect.GeneratedMethodAccessor105.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1370) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.lang.Thread.run(Thread.java:748) {noformat} was (Author: s32967...@163.com): Of course {code:java} [root@kafka3 flink-1.13.1]# bin/flink run -n -c com.shanjiancaofu.live.job.ChargeJob -s file:/soft/opt/checkpoint/0d294a9fbed5da53390d048db00d557d/chk-44/_metadata ./ad-live-process-0.11-jar-with-dependencies.jar[root@kafka3 flink-1.13.1]# bin/flink run -n -c com.shanjiancaofu.live.job.ChargeJob -s file:/soft/opt/checkpoint/0d294a9fbed5da53390d048db00d557d/chk-44/_metadata ./ad-live-process-0.11-jar-with-dependencies.jar . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: Job has been submitted with JobID a5859d58669b2ad5ec3798086526e64e------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: a5859d58669b2ad5ec3798086526e64e) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: a5859d58669b2ad5ec3798086526e64e) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) at com.shanjiancaofu.live.job.ChargeJob.main(ChargeJob.java:155) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 8 moreCaused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: a5859d58669b2ad5ec3798086526e64e) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$24(RestClusterClient.java:670) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123) ... 24 moreCaused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=300000,backoffTimeMS=10000,maxFailuresPerInterval=1) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) at sun.reflect.GeneratedMethodAccessor105.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1370) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.lang.Thread.run(Thread.java:748) {code} > flink restart with checkpoint ,kafka producer throw exception > -------------------------------------------------------------- > > Key: FLINK-23674 > URL: https://issues.apache.org/jira/browse/FLINK-23674 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.13.1 > Environment: flink:flink-1.13.1 > kafka: _2.12-2.5.0 > java: 1.8.0_161 > Reporter: meetsong > Priority: Major > > > when I test flink eos, and sink is kafka. first I click the button of > cancel on flink web ui , then I input following code on console > {code:java} > bin/flink run -n -c com.shanjiancaofu.live.job.ChargeJob -s > file:/soft/opt/checkpoint/072c0a72343c6e1f06b9bd37c5147cc0/chk-1/_metadata > ./ad-live-process-0.11-jar-with-dependencies.jar > {code} > , after 10 second throw a exception > {code:java} > Caused by: org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; Producer attempted an operation with an old epoch. > Either there is a newer producer with the same transactionalId, or the > producer's transaction has been expired by the broker. > {code} > and my code is : > {code:java} > package com.shanjiancaofu.live.job; > import com.alibaba.fastjson.JSON; > import lombok.AllArgsConstructor; > import lombok.Data; > import lombok.NoArgsConstructor; > import lombok.extern.slf4j.Slf4j; > import org.apache.commons.lang.SystemUtils; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.api.common.state.ListState; > import org.apache.flink.api.common.state.ListStateDescriptor; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.functions.KeySelector; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.runtime.state.filesystem.FsStateBackend; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.environment.CheckpointConfig; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; > import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; > import > org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; > import org.apache.flink.util.Collector; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.consumer.ConsumerRecord; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.common.IsolationLevel; > import java.util.*; > @Slf4j > public class ChargeJob1 { > static class RecordScheme implements > KafkaDeserializationSchema<ConsumerRecord<String, UserEvent>> { > @Override > public boolean isEndOfStream(ConsumerRecord<String, UserEvent> > stringUserEventConsumerRecord) { > return false; > } > @Override > public ConsumerRecord<String, UserEvent> > deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception { > String key = null; > UserEvent UserEvent = null; > if (consumerRecord.key() != null) { > key = new String(consumerRecord.key()); > } > if (consumerRecord.value() != null) { > UserEvent = JSON.parseObject(new String(consumerRecord.value()), > UserEvent.class); > } > return new ConsumerRecord<>( > consumerRecord.topic(), > consumerRecord.partition(), > consumerRecord.offset(), > consumerRecord.timestamp(), > consumerRecord.timestampType(), > consumerRecord.checksum(), > consumerRecord.serializedKeySize(), > consumerRecord.serializedValueSize(), > key, UserEvent); > } > @Override > public TypeInformation<ConsumerRecord<String, UserEvent>> > getProducedType() { > return TypeInformation.of(new TypeHint<ConsumerRecord<String, > UserEvent>>() { > }); > } > } > public static void main(String[] args) throws Exception { > Configuration configuration = new Configuration(); > if (args != null) { > // 传递全局参数 > configuration.setString("args", String.join(" ", args)); > } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(3); > env.setRestartStrategy(new > RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.minutes(5), > Time.seconds(10))); > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); > //本地checkpoint配置 > env.enableCheckpointing(1000 * 60L); > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > // 每个检查点的间隔 > checkpointConfig.setMinPauseBetweenCheckpoints(1000 * 5L); > checkpointConfig.setCheckpointTimeout(1000 * 60L); > // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint > > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > if (SystemUtils.IS_OS_WINDOWS) { > env.setStateBackend(new > FsStateBackend("file:///soft/opt/checkpoint")); > } else { > env.setStateBackend(new > FsStateBackend("file:///soft/opt/checkpoint")); > } > // 2. 读取数据 > //kafka sink配置//kafka sink配置 > Properties sinkProperties = new Properties(); > sinkProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "192.168.17.81:9092,192.168.17.82:9092,192.168.17.83:9092"); > sinkProperties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, > 1000 * 60 + ""); > sinkProperties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); > sinkProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, > "true"); > > sinkProperties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > "1"); > sinkProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, > "user-event-processd-tr"); > sinkProperties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, > "org.apache.kafka.clients.producer.RoundRobinPartitioner"); > FlinkKafkaProducer<String> stringFlinkKafkaProducer = new > FlinkKafkaProducer<String>("dsp-user-event-processd", > new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), > sinkProperties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); > Properties consumerProp = new Properties(); > consumerProp.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "192.168.17.81:9092,192.168.17.82:9092,192.168.17.83:9092"); > consumerProp.setProperty(ConsumerConfig.GROUP_ID_CONFIG, > "dsp-user-event-group"); > consumerProp.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > > consumerProp.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > consumerProp.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "latest"); > consumerProp.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, > IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); > > consumerProp.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > "org.apache.kafka.clients.consumer.RoundRobinAssignor"); > env.addSource(new FlinkKafkaConsumer<>("dsp-user-event", new > RecordScheme(), consumerProp)) > .name("dsp-user-event-source") > .keyBy(new KeySelector<ConsumerRecord<String, UserEvent>, Long>() { > @Override > public Long getKey(ConsumerRecord<String, UserEvent> value) > throws Exception { > return value.value().getUserId(); > } > }) > .process(new ChargeProcess()).setParallelism(3) > .map(obj -> obj) > .addSink(stringFlinkKafkaProducer) > .name("dsp-user-event-sink").uid("dsp-user-event-sink-uid"); > env.execute("chargeJob"); > } > public static class ChargeProcess extends KeyedProcessFunction<Long, > ConsumerRecord<String, UserEvent>, String> { > ListState<String> listState = null; > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > listState = getRuntimeContext().getListState(new > ListStateDescriptor<String>("ad-ip", String.class)); > } > @Override > public void close() throws Exception { > super.close(); > } > @Override > public void processElement(ConsumerRecord<String, UserEvent> > stringUserEventConsumerRecord, > Context context, > Collector<String> collector) throws Exception { > UserEvent value = stringUserEventConsumerRecord.value(); > Iterable<String> strings = listState.get(); > Iterator<String> iterator = strings.iterator(); > List<String> objects = new ArrayList<>(); > iterator.forEachRemaining(objects::add); > objects.add(value.getUserId() + ""); > try { > // 幂等操作即可, self process > boolean result = true; > if (result) { > listState.update(objects); > collector.collect(JSON.toJSONString(value)); > } > log.info(Thread.currentThread().getId() + ": 处理:" + > value.getEventMd5() + " " + listState.get().toString()); > System.out.println(Thread.currentThread().getId() + ": 处理:" + > value.getEventMd5() + " " + listState.get().toString()); > } catch (Exception e) { > System.out.println(e); > } > } > } > @Data > @AllArgsConstructor > @NoArgsConstructor > public static class UserEvent { > private Long userId; > private String eventMd5; > } > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)