1. It always depends on the data volume per user. A million user is not
much if you compare it to the biggest Flink installations (Netflix,
Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd
recommend to use rocksDB state backend. [1]
2. Are you referring to statefun? I'd
My Flink job failed to checkpoint with a "The job has failed" error. The
logs contained no other recent errors. I keep hitting the error even if I
cancel the jobs and restart them. When I restarted my jobmanager and
taskmanager, the error went away.
What error am I hitting? It looks like
Hi, maverick.
The watermark is used to determine the message is late or early. If we only
use the watermark on versioned table side, we have no means to determine
whether the event in the main stream is ready to emit.
Best,
Shengkai
maverick 于2021年4月26日周一 上午2:31写道:
> Hi,
> I'm curious why
hi,all
这边有个job是利用Flink消费Kafka数据,然后对指标聚合写入redis,job最近频繁重启,相关异常日志如下:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout.
Please make sure that the cluster has enough resources.
at
It seems that the JobListener interface could not expose such
information. Maybe you can set the RuleId as the jobName(or the suffix
of the jobName) of the application, then you can get the mappings of
jobId to jobName(RuleId) throw /jobs/overview.
[1]
请教一下,flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
和kafka自己的"enable.auto.commit"=true【默认就是true,
interval=5s】,在checkpoint的时候有啥区别,假如我已经enable了chk?
看注释flinkKafkaConsumer.setCommitOffsetsOnCheckpoints()方法的注释如下:
/**
* Specifies
No. But I decided to disable it finally
On Sun., Apr. 25, 2021, 5:14 a.m. Yun Gao, wrote:
> Hi John,
>
> Logically the maximum retained checkpoints are configured
> by state.checkpoints.num-retained [1]. Have you configured
> this option?
>
>
> Best,
> Yun
>
> [1]
>
Hi Eddie,
I have tried your program with the following changes and it could execute
successfully:
- Replace
`rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”`
with
Can a Kafka Consumer Source have more tasks run in parallel than the number
of partitions for the topic it is the source of? Or is the max parallelism
of the source constrained by max partitions of the topic?
1. What if there are a very high number of users, like a million customers
won't the service crash? Is it advisable to hold the data in memory.
2. What if state-functions are used to calculate the value ? How will this
approach differ from the one proposed below.
Regards,
Swagat
On Wed, Apr
Hey Yang, Community
As been discussed few weeks ago, I'm working on Application Cluster - Native
K8s approach, running Flink 1.12.2.
We deploy application clusters programmatically which works well.
In addition, we leverage Kubernetes client(Fabric8io) to watch the
deployment/pods status and
Hi,
I'm curious why Event Time Temporal Join needs watermarks from both sides to
perform join.
Shouldn't watermark on versioned table side be enough to perform join ?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi
我有在使用 temporal Joini 的时候有设置 如果读取分区的相关的 dynamic
option,但是最后是没有生效的,我看全部使用的默认参数,打印出来了执行计划,逻辑执行计划是有的,优化之后没有了
如下,我设置的是加载最新分区,24小时加载一次,我看最后运行的日志是加载的全部分区,1小时有一次加载,这都是默认的参数,所以怀疑是 dyanmic
option 没有生效。
== Abstract Syntax Tree ==
+- LogicalSnapshot(period=[$cor0.proctime])
Hi Dian,
I tried your suggestion but had the same error message unfortunately. I
also tried file:/ and file:// with the same error, not sure what's going
on, I assume writing to avro works fine in java and scala?
Eddie
On Sat, Apr 24, 2021 at 10:03 PM Dian Fu wrote:
> I guess you only need
>
> Now, I'm just worried about the state size. State size will grow forever.
> There is no TTL.
The potential for unbounded state is certainly a problem, and it's going to
be a problem no matter how you implement the deduplication. Standard
techniques for mitigating this include (1) limiting
environment:
flinksql 1.12.2
k8s session mode
description:
I got follow error log when my kafka connector port was wrong
>
2021-04-25 16:49:50
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired
before the position for partition filebeat_json_install_log-3
所有算子都需要维护。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
环境:
flinksql 1.12.2
k8s session模式
描述:
当kafka 端口错误,过一段时间会有如下报错:
2021-04-25 16:49:50
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired
before the position for partition filebeat_json_install_log-3 could be
determined
当kafka ip错误,过一段时间会有如下报错:
2021-04-25 20:12:53
非常感谢!
> 在 2021年4月25日,19:19,JasonLee <17610775...@163.com> 写道:
>
> hi
>
> currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets:
> 表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以
> committedOffsets 会比 currentOffsets 大 1
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent
hi
currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets:
表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以
committedOffsets 会比 currentOffsets 大 1
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi
从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
用侧输出流的方式能单独把值取出来吗?这个要怎么取值呢
JasonLee <17610775...@163.com> 于2021年4月25日周日 下午5:58写道:
> hi
>
> 你可以用 filter 过滤出多个流或者用测流输出的方式分流处理
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
您好,
flink-sql-connector-kafka_2.11-1.11.3.jar
这个包已经在flink的lib目录下了。
maker_d...@foxmail.com
发件人: JasonLee
发送时间: 2021-04-25 17:56
收件人: user-zh
主题: Re: 提交FlinkSQLKafka表报异常cannt load user class
hi
从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下
-
Best Wishes
JasonLee
--
Sent
hi
你可以用 filter 过滤出多个流或者用测流输出的方式分流处理
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi
从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
社区各位大佬大家好,
我想通过flinkcdc读取mysql表,然后发送到kafka表。
在我使用sql-client客户端向kafka表插入数据时,报如下错误:
2021-04-25 17:21:03
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
flink版本使用1.12.2。有一个需求就是想要从stream中拿出自定义的数据结构,暂且叫a并赋值给后面变量,基于这个a取他的属性作一些判断操作。
比如:
val ds: DataStream[b] = stream.filter(_.nonEmpty).map(new
MapFunction[String, b] {
override def map(value: String) = {
val recallKafka = JSON.parseObject(value, classOf[a])
hi,I met an appearance like this:
this is my sql:
SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat FROM
app.app_ranking_feature_table_clk_ord_hp_new_all_tree_orc where dt='2021-04-01'
When I useBlinkPlanner inBatchMode, It works well; But if I set inStreamMode,
It
Hi John,
Logically the maximum retained checkpoints are configured
by state.checkpoints.num-retained [1]. Have you configured
this option?
Best,
Yun
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#state-checkpoints-num-retained
Hi all,
I am building an application that launches Flink Jobs and monitors them.
I want to use the JobListener interface to output job evemts to a Kafka Topic.
The problem:
In the application we have RuleId, i.e. business logic identifier for the job,
and there’s JobId which is the
Hi!
Have any other devs noticed issues with Flink missing Kafka records with
long-running Flink jobs? When I re-run my Flink job and start from the
earliest Kafka offset, Flink processes the events correctly. I'm using
Flink v1.11.1.
I have a simple job that takes records (Requests) from Kafka
org.apache.kafka.connect.errors.ConnectException: An exception occurred in
the change event producer. This connector will be stopped.
at
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0]
at
Flink
SQL任务提交后,从JobManager监控指标中发现kafka的offset有2个指标信息,currentOffsets和committedOffsets,当Kafka无新增数据,程序运行一段时间后,发现指标仪表盘上显示
currentOffsets:2897
committedOffsets:2898
这2个值没有变化(应该是数据已经消费完毕了),现在的疑惑是:怎么这2个offset的值还不一致?committedOffsets表示已经提交和保存state中的offset吗?currentOffsets表示啥含义?烦请指教下,多谢!
33 matches
Mail list logo