Flink消费Kafka数据积压

2021-04-25 文章 Back moon
hi,all
这边有个job是利用Flink消费Kafka数据,然后对指标聚合写入redis,job最近频繁重启,相关异常日志如下:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout.
Please make sure that the cluster has enough resources.
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:452)
[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$5(DefaultScheduler.java:433)
[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
~[na:1.8.0_121]
at 
org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:168)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
~[na:1.8.0_121]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:726)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:432)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
~[na:1.8.0_121]
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
~[na:1.8.0_121]
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:999)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

flinkKafkaConsumer的offset提交的问题

2021-04-25 文章 lp
请教一下,flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
和kafka自己的"enable.auto.commit"=true【默认就是true,
interval=5s】,在checkpoint的时候有啥区别,假如我已经enable了chk?

看注释flinkKafkaConsumer.setCommitOffsetsOnCheckpoints()方法的注释如下:

/**
 * Specifies whether or not the consumer should commit offsets back to
Kafka on checkpoints.
 *
 * This setting will only have effect if checkpointing is enabled for
the job. If
 * checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8)
/ "enable.auto.commit"
 * (for 0.9+) property settings will be used.
 *
 * @return The consumer object, to allow function chaining.
 */

我的理解是:意思是如果enable了checkpoint,然后设置flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
(貌似默认就是true),就会采用checkpoint的interval去向kafka提交offset? 
,而不采用auto.commit.enable的配置?这样理解对么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Dynamic Table Options 被优化器去掉了

2021-04-25 文章 macia kk
Hi

  我有在使用 temporal Joini 的时候有设置 如果读取分区的相关的 dynamic
option,但是最后是没有生效的,我看全部使用的默认参数,打印出来了执行计划,逻辑执行计划是有的,优化之后没有了
  如下,我设置的是加载最新分区,24小时加载一次,我看最后运行的日志是加载的全部分区,1小时有一次加载,这都是默认的参数,所以怀疑是 dyanmic
option 没有生效。


== Abstract Syntax Tree ==
+- LogicalSnapshot(period=[$cor0.proctime])
   +- LogicalTableScan(table=[[ds, my_db, store_da_table,
source: [HiveTableSource(store_id, store_name, merchant_id, tag_id,
brand_id, tob_user_id, is_use_wallet, is_use_merchant_app, longitude,
latitude, state, city, district, address, postal_code, register_phone,
email, email_source, register_time, logo, banner, partner_type,
commission_rate, tax_rate, service_fee, min_spend, delivery_distance,
preparation_time, contact_phone, store_status, closed_start_time,
closed_end_time, effective_closed_end_time, auto_confirmed,
auto_confirmed_enabled, create_time, update_time, rating_total,
rating_score, opening_status, surcharge_intervals, service_charge_fee_rate,
driver_modify_order_enabled, delivery_distance_mode, business_info_added,
mtime, dt, grass_region) TablePath: my_db.store_da_table, PartitionPruned:
false, PartitionNums: null], dynamic options:
{streaming-source.enable=true, streaming-source.monitor-interval=24 h,
streaming-source.partition.include=latest}]])

== Optimized Logical Plan ==
Calc(select=[_UTF-16LE'v4' AS version, _UTF-16LE'ID' AS country, city, id,
event_time, operation, platform, payment_method, gmv, 0.0:DECIMAL(2, 1) AS
gmv_usd], where=[NOT(LIKE(UPPER(store_name), _UTF-16LE'%[TEST]%'))])
+- LookupJoin(table=[ds.my_db.store_da_table],
joinType=[LeftOuterJoin], async=[false], lookup=[store_id=store_id],
select=[city, id, event_time, operation, platform, payment_method, gmv,
store_id, store_id, store_name])
   +- Union(all=[true], union=[city, id, event_time, operation, platform,
payment_method, gmv, store_id])
  :- Calc(select=[delivery_city AS city, id, /(CAST(create_time), 1000)
AS event_time, CASE(OR(=(order_status, 440), =(order_status, 800)),
_UTF-16LE'NET':VARCHAR(5) CHARACTER SET "UTF-16LE",
_UTF-16LE'GROSS':VARCHAR(5) CHARACTER SET "UTF-16LE") AS operation,
_UTF-16LE'' AS platform, payment_method, /(CAST(total_amount), 10)
AS gmv, CAST(store_id) AS store_id])
  :  +- DataStreamScan(table=[[ds, keystats,
main_db__transaction_tab]], fields=[id, delivery_city, store_id,
create_time, payment_time, order_status, payment_method, total_amount,
proctime], reuse_id=[1])
  +- Calc(select=[delivery_city AS city, id, /(CAST(payment_time),
1000) AS event_time, _UTF-16LE'NET':VARCHAR(5) CHARACTER SET "UTF-16LE" AS
operation, _UTF-16LE'AIRPAY' AS platform, payment_method,
/(CAST(total_amount), 10) AS gmv, CAST(store_id) AS store_id],
where=[OR(=(order_status, 440), =(order_status, 800))])
 +- Reused(reference_id=[1])


Re: 多个复杂算子保证精准一次性

2021-04-25 文章 hk__lrzy
所有算子都需要维护。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flinksql当一个job的kafka连接信息错误时,会导致整个session集群无法正常发布任务

2021-04-25 文章 chenxuying
环境:

flinksql 1.12.2

k8s session模式

描述:

当kafka 端口错误,过一段时间会有如下报错:

2021-04-25 16:49:50

org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition filebeat_json_install_log-3 could be 
determined

当kafka ip错误,过一段时间会有如下报错:

2021-04-25 20:12:53

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
while fetching topic metadata







然后对任务执行停止取消操作,会得到如下错误

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from state 
CANCELLING to CANCELED.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping 
checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - 
Shutting down

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 1 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1'
 not discarded.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 2 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2'
 not discarded.

2021-04-25 08:53:41,116 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 3 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3'
 not discarded.

2021-04-25 08:53:41,137 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED.

2021-04-25 08:53:41,148 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Stopping the JobMaster for job 
v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf).

2021-04-25 08:53:41,151 INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending 
SlotPool.

2021-04-25 08:53:41,151 

Re: Flink SQL Metrics中Kafka Offset请教

2021-04-25 文章 占英华
非常感谢!

> 在 2021年4月25日,19:19,JasonLee <17610775...@163.com> 写道:
> 
> hi
> 
> currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets:
> 表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以
> committedOffsets 会比 currentOffsets 大 1
> 
> 
> 
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Flink SQL Metrics中Kafka Offset请教

2021-04-25 文章 JasonLee
hi

currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets:
表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以
committedOffsets 会比 currentOffsets 大 1



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 文章 maker_d...@foxmail.com
hi

从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink如何从流中取出自定义的数据结构并赋值给变量

2021-04-25 文章 张锴
用侧输出流的方式能单独把值取出来吗?这个要怎么取值呢

JasonLee <17610775...@163.com> 于2021年4月25日周日 下午5:58写道:

> hi
>
> 你可以用 filter 过滤出多个流或者用测流输出的方式分流处理
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: 提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 文章 maker_d...@foxmail.com
您好,
flink-sql-connector-kafka_2.11-1.11.3.jar
这个包已经在flink的lib目录下了。



maker_d...@foxmail.com
 
发件人: JasonLee
发送时间: 2021-04-25 17:56
收件人: user-zh
主题: Re: 提交FlinkSQLKafka表报异常cannt load user class
hi
 
从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


Re: flink如何从流中取出自定义的数据结构并赋值给变量

2021-04-25 文章 JasonLee
hi

你可以用 filter 过滤出多个流或者用测流输出的方式分流处理



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 文章 JasonLee
hi

从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 文章 maker_d...@foxmail.com
社区各位大佬大家好,
我想通过flinkcdc读取mysql表,然后发送到kafka表。
在我使用sql-client客户端向kafka表插入数据时,报如下错误:

2021-04-25 17:21:03
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:590)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
file: 
'/data/yarn/nm/usercache/flink/appcache/application_1618375297719_0009/blobStore-656e7e03-d94c-4861-b492-aeca2e5b4218/job_3682b0f430839794beb0d09e8e53b416/blob_p-e79c4e89fbdd13c78a3a0602a35a8c6f2ab35ebc-2f20c3259bf505db1bb258562da113c0'
 (valid JAR)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:272)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 

flink如何从流中取出自定义的数据结构并赋值给变量

2021-04-25 文章 张锴
flink版本使用1.12.2。有一个需求就是想要从stream中拿出自定义的数据结构,暂且叫a并赋值给后面变量,基于这个a取他的属性作一些判断操作。
比如:
   val ds: DataStream[b] = stream.filter(_.nonEmpty).map(new
MapFunction[String, b] {

  override def map(value: String) = {
  val recallKafka = JSON.parseObject(value, classOf[a])

  b(recallKafka.group_id, value, recallKafka.eventTime)

  }
})

val kafkaCommonData: a =recallKafka
判断条件
 if (kafkaCommonData.data.date != null) {x}
if (kafkaCommonData.data.userinfo != null) {}
.
请问一下,我通过什么方法能单独把流中的某个数据结构给取出来呢?如果有方式的话应该要怎么写呢?大佬们帮忙看一下啊,卡了好几天 了,难受。。




flink sql 使用cdc 同步postgresql的数据到ES,报错: org.postgresql.util.PSQLException: 错误: 内存用尽

2021-04-25 文章 william
org.apache.kafka.connect.errors.ConnectException: An exception occurred in
the change event producer. This connector will be stopped.
at
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0]
at
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:150)
~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:140)
~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:113)
~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_202]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_202]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_202]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
Caused by: org.postgresql.util.PSQLException: 错误: 内存用尽
Detail: 无法为包含1073741350字节的字符串缓冲区扩大525个更多字节.
Where: 槽 "xxx_xxx", 输出插件 "wal2json", 在 change 回调, 关联的 LSN 地址为690/69ABCE18



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink SQL Metrics中Kafka Offset请教

2021-04-25 文章 邮件帮助中心
Flink 
SQL任务提交后,从JobManager监控指标中发现kafka的offset有2个指标信息,currentOffsets和committedOffsets,当Kafka无新增数据,程序运行一段时间后,发现指标仪表盘上显示
currentOffsets:2897
committedOffsets:2898
这2个值没有变化(应该是数据已经消费完毕了),现在的疑惑是:怎么这2个offset的值还不一致?committedOffsets表示已经提交和保存state中的offset吗?currentOffsets表示啥含义?烦请指教下,多谢!