Flink消费Kafka数据积压
hi,all 这边有个job是利用Flink消费Kafka数据,然后对指标聚合写入redis,job最近频繁重启,相关异常日志如下: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:452) [flink-dist_2.11-1.10.1.jar:1.10.1] at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$5(DefaultScheduler.java:433) [flink-dist_2.11-1.10.1.jar:1.10.1] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[na:1.8.0_121] at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:168) ~[flink-dist_2.11-1.10.1.jar:1.10.1] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[na:1.8.0_121] at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:726) ~[flink-dist_2.11-1.10.1.jar:1.10.1] at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537) ~[flink-dist_2.11-1.10.1.jar:1.10.1] at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:432) ~[flink-dist_2.11-1.10.1.jar:1.10.1] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[na:1.8.0_121] at org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065) ~[flink-dist_2.11-1.10.1.jar:1.10.1] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[na:1.8.0_121] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[na:1.8.0_121] at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:999) ~[flink-dist_2.11-1.10.1.jar:1.10.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-dist_2.11-1.10.1.jar:1.10.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-dist_2.11-1.10.1.jar:1.10.1] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.10.1.jar:1.10.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.10.1.jar:1.10.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
flinkKafkaConsumer的offset提交的问题
请教一下,flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true); 和kafka自己的"enable.auto.commit"=true【默认就是true, interval=5s】,在checkpoint的时候有啥区别,假如我已经enable了chk? 看注释flinkKafkaConsumer.setCommitOffsetsOnCheckpoints()方法的注释如下: /** * Specifies whether or not the consumer should commit offsets back to Kafka on checkpoints. * * This setting will only have effect if checkpointing is enabled for the job. If * checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" * (for 0.9+) property settings will be used. * * @return The consumer object, to allow function chaining. */ 我的理解是:意思是如果enable了checkpoint,然后设置flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true); (貌似默认就是true),就会采用checkpoint的interval去向kafka提交offset? ,而不采用auto.commit.enable的配置?这样理解对么? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Dynamic Table Options 被优化器去掉了
Hi 我有在使用 temporal Joini 的时候有设置 如果读取分区的相关的 dynamic option,但是最后是没有生效的,我看全部使用的默认参数,打印出来了执行计划,逻辑执行计划是有的,优化之后没有了 如下,我设置的是加载最新分区,24小时加载一次,我看最后运行的日志是加载的全部分区,1小时有一次加载,这都是默认的参数,所以怀疑是 dyanmic option 没有生效。 == Abstract Syntax Tree == +- LogicalSnapshot(period=[$cor0.proctime]) +- LogicalTableScan(table=[[ds, my_db, store_da_table, source: [HiveTableSource(store_id, store_name, merchant_id, tag_id, brand_id, tob_user_id, is_use_wallet, is_use_merchant_app, longitude, latitude, state, city, district, address, postal_code, register_phone, email, email_source, register_time, logo, banner, partner_type, commission_rate, tax_rate, service_fee, min_spend, delivery_distance, preparation_time, contact_phone, store_status, closed_start_time, closed_end_time, effective_closed_end_time, auto_confirmed, auto_confirmed_enabled, create_time, update_time, rating_total, rating_score, opening_status, surcharge_intervals, service_charge_fee_rate, driver_modify_order_enabled, delivery_distance_mode, business_info_added, mtime, dt, grass_region) TablePath: my_db.store_da_table, PartitionPruned: false, PartitionNums: null], dynamic options: {streaming-source.enable=true, streaming-source.monitor-interval=24 h, streaming-source.partition.include=latest}]]) == Optimized Logical Plan == Calc(select=[_UTF-16LE'v4' AS version, _UTF-16LE'ID' AS country, city, id, event_time, operation, platform, payment_method, gmv, 0.0:DECIMAL(2, 1) AS gmv_usd], where=[NOT(LIKE(UPPER(store_name), _UTF-16LE'%[TEST]%'))]) +- LookupJoin(table=[ds.my_db.store_da_table], joinType=[LeftOuterJoin], async=[false], lookup=[store_id=store_id], select=[city, id, event_time, operation, platform, payment_method, gmv, store_id, store_id, store_name]) +- Union(all=[true], union=[city, id, event_time, operation, platform, payment_method, gmv, store_id]) :- Calc(select=[delivery_city AS city, id, /(CAST(create_time), 1000) AS event_time, CASE(OR(=(order_status, 440), =(order_status, 800)), _UTF-16LE'NET':VARCHAR(5) CHARACTER SET "UTF-16LE", _UTF-16LE'GROSS':VARCHAR(5) CHARACTER SET "UTF-16LE") AS operation, _UTF-16LE'' AS platform, payment_method, /(CAST(total_amount), 10) AS gmv, CAST(store_id) AS store_id]) : +- DataStreamScan(table=[[ds, keystats, main_db__transaction_tab]], fields=[id, delivery_city, store_id, create_time, payment_time, order_status, payment_method, total_amount, proctime], reuse_id=[1]) +- Calc(select=[delivery_city AS city, id, /(CAST(payment_time), 1000) AS event_time, _UTF-16LE'NET':VARCHAR(5) CHARACTER SET "UTF-16LE" AS operation, _UTF-16LE'AIRPAY' AS platform, payment_method, /(CAST(total_amount), 10) AS gmv, CAST(store_id) AS store_id], where=[OR(=(order_status, 440), =(order_status, 800))]) +- Reused(reference_id=[1])
Re: 多个复杂算子保证精准一次性
所有算子都需要维护。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flinksql当一个job的kafka连接信息错误时,会导致整个session集群无法正常发布任务
环境: flinksql 1.12.2 k8s session模式 描述: 当kafka 端口错误,过一段时间会有如下报错: 2021-04-25 16:49:50 org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition filebeat_json_install_log-3 could be determined 当kafka ip错误,过一段时间会有如下报错: 2021-04-25 20:12:53 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 然后对任务执行停止取消操作,会得到如下错误 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from state CANCELLING to CANCELED. 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf. 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 1 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1' not discarded. 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 2 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2' not discarded. 2021-04-25 08:53:41,116 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 3 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3' not discarded. 2021-04-25 08:53:41,137 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED. 2021-04-25 08:53:41,148 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf). 2021-04-25 08:53:41,151 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending SlotPool. 2021-04-25 08:53:41,151
Re: Flink SQL Metrics中Kafka Offset请教
非常感谢! > 在 2021年4月25日,19:19,JasonLee <17610775...@163.com> 写道: > > hi > > currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets: > 表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以 > committedOffsets 会比 currentOffsets 大 1 > > > > - > Best Wishes > JasonLee > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL Metrics中Kafka Offset请教
hi currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets: 表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以 committedOffsets 会比 currentOffsets 大 1 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: 提交FlinkSQLKafka表报异常cannt load user class
hi 从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink如何从流中取出自定义的数据结构并赋值给变量
用侧输出流的方式能单独把值取出来吗?这个要怎么取值呢 JasonLee <17610775...@163.com> 于2021年4月25日周日 下午5:58写道: > hi > > 你可以用 filter 过滤出多个流或者用测流输出的方式分流处理 > > > > - > Best Wishes > JasonLee > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: Re: 提交FlinkSQLKafka表报异常cannt load user class
您好, flink-sql-connector-kafka_2.11-1.11.3.jar 这个包已经在flink的lib目录下了。 maker_d...@foxmail.com 发件人: JasonLee 发送时间: 2021-04-25 17:56 收件人: user-zh 主题: Re: 提交FlinkSQLKafka表报异常cannt load user class hi 从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink如何从流中取出自定义的数据结构并赋值给变量
hi 你可以用 filter 过滤出多个流或者用测流输出的方式分流处理 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 提交FlinkSQLKafka表报异常cannt load user class
hi 从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
提交FlinkSQLKafka表报异常cannt load user class
社区各位大佬大家好, 我想通过flinkcdc读取mysql表,然后发送到kafka表。 在我使用sql-client客户端向kafka表插入数据时,报如下错误: 2021-04-25 17:21:03 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:590) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer ClassLoader info: URL ClassLoader: file: '/data/yarn/nm/usercache/flink/appcache/application_1618375297719_0009/blobStore-656e7e03-d94c-4861-b492-aeca2e5b4218/job_3682b0f430839794beb0d09e8e53b416/blob_p-e79c4e89fbdd13c78a3a0602a35a8c6f2ab35ebc-2f20c3259bf505db1bb258562da113c0' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:272) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at
flink如何从流中取出自定义的数据结构并赋值给变量
flink版本使用1.12.2。有一个需求就是想要从stream中拿出自定义的数据结构,暂且叫a并赋值给后面变量,基于这个a取他的属性作一些判断操作。 比如: val ds: DataStream[b] = stream.filter(_.nonEmpty).map(new MapFunction[String, b] { override def map(value: String) = { val recallKafka = JSON.parseObject(value, classOf[a]) b(recallKafka.group_id, value, recallKafka.eventTime) } }) val kafkaCommonData: a =recallKafka 判断条件 if (kafkaCommonData.data.date != null) {x} if (kafkaCommonData.data.userinfo != null) {} . 请问一下,我通过什么方法能单独把流中的某个数据结构给取出来呢?如果有方式的话应该要怎么写呢?大佬们帮忙看一下啊,卡了好几天 了,难受。。
flink sql 使用cdc 同步postgresql的数据到ES,报错: org.postgresql.util.PSQLException: 错误: 内存用尽
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0] at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:150) ~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0] at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:140) ~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0] at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:113) ~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_202] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_202] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202] Caused by: org.postgresql.util.PSQLException: 错误: 内存用尽 Detail: 无法为包含1073741350字节的字符串缓冲区扩大525个更多字节. Where: 槽 "xxx_xxx", 输出插件 "wal2json", 在 change 回调, 关联的 LSN 地址为690/69ABCE18 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink SQL Metrics中Kafka Offset请教
Flink SQL任务提交后,从JobManager监控指标中发现kafka的offset有2个指标信息,currentOffsets和committedOffsets,当Kafka无新增数据,程序运行一段时间后,发现指标仪表盘上显示 currentOffsets:2897 committedOffsets:2898 这2个值没有变化(应该是数据已经消费完毕了),现在的疑惑是:怎么这2个offset的值还不一致?committedOffsets表示已经提交和保存state中的offset吗?currentOffsets表示啥含义?烦请指教下,多谢!