各位好,
目前使用sql-client查询hbase数据时,无法查询成功,麻烦指导下,谢谢。
复现方法:
1、hbase操作:
hbase(main):005:0> create 'flink_to_hbase','cf1'
0 row(s) in 2.2900 seconds
hbase(main):006:0> put 'flink_to_hbase', 'rk0001', 'cf1:username', 'zhangsan'
0 row(s) in 0.0510 seconds
2、flink操作:
./start-cluster.sh
Hi,
一般需要确认一下是哪块引起的fullgc,比如metaspace还是堆内存过大导致的。如果是堆内存过大导致的,可以将内存dump下来,用一些分析工具例如mat、visualvm等具体查看一下哪些对象占比比较多,是否存在内存泄漏等原因
Best,
Shammon FY
On Fri, Aug 4, 2023 at 10:00 AM yidan zhao wrote:
> GC日志看GC原因
>
> 2278179732 <2278179...@qq.com.invalid> 于2023年8月3日周四 13:59写道:
> >
> >
Regular Join 默认把数据都存储在State中,通常会结合TTL来进行优化
guanyq 于2023年8月3日周四 15:59写道:
> 请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式
>
> select
> date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time,
> b.vehicle_code,
> a.item_name,
>
GC日志看GC原因
2278179732 <2278179...@qq.com.invalid> 于2023年8月3日周四 13:59写道:
>
> 大家好,请问下作业跑一段时间就会偶发出现背压,full gc看着很严重,有什么好的工具排查下吗?或者经验文档?谢谢!
请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式
select
date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time,
b.vehicle_code,
a.item_name,
a.item_value,
c.item_value as vehicle_score,
d.current_fault,
首先你窗口是30min,刚刚开始肯定会是涨的。
其次,后续稳定后,继续涨可能是因为流量在变化。
最后,流量不变情况下,还可能受到延迟的影响。
lxk 于2023年7月25日周二 11:22写道:
>
> 相关配置:
> Flink:1.16
>
> | Checkpointing Mode | Exactly Once |
> | Checkpoint Storage | FileSystemCheckpointStorage |
> | State Backend | EmbeddedRocksDBStateBackend |
> | Interval | 8m 0s |
>
>
>
这个取决于你是什么模型,比如python中sklearn的大多模型都可以导出成pmml格式模型,然后java用jpmml库就可以导入进行预测。
如果是tensorflow模型,也有,只不过我忘记了,你可以找找。
15904502343 <15904502...@163.com> 于2023年8月1日周二 16:48写道:
>
> 您好
> 我想知道是否有代码示例,可以在Flink程序中加载预先训练好的编码模型(用python编写)
你好,
不需要将所有的依赖都改为snapshot,仅需要将我们项目内的版本加上 snapshot 后缀。
可以在项目中统一替换版本号 1.x.x -> 1.x.x-SNAPSHOT,或者使用 mvn versions:set
-DnewVersion=1.x.x-SNAPSHOT 设置。
> 2023年8月2日 下午2:25,jinzhuguang 写道:
>
>
非常感谢你的提醒,我现在用maven工具修改了所有的版本号为snapshot,但是flink-connectors(connectors的父模块)也变成snapshot,打包的时候仓库里找不到他了,而且也没法想flink-runtime这些包手动改下版本好,这种该怎么办
> 2023年7月27日 11:05,Jiabao Sun 写道:
>
> 你好,
>
> 通常在 pom 中引入 maven-deploy-plugin,并且通过 声明私服地址,使用 mvn
> clean deploy 命令部署到nexus私服。
> 部署到 SNAPSHOT 仓库需要项目版本号包含
Hi, 我理解可以有两种方式:
1. 设定从某个存储集群上恢复并向另一个存储集群上快照,即设置[1]为 HDFS地址,[2] 为后面的对象存储地址
2. 还是在HDFS集群上启停作业,设置 savepoint 目录[3]到对象存储
关于 state processor api,目前 sql 作业确实操作起来比较困难,只能从日志里获取 uid 等信息,以及理解 sql
实际产生的状态才能使用;
[1]
/opt/flink/flink-1.17.1/bin/flink run-application -t yarn-application -yjm
1024m -ytm 1024m ./xx-1.0.jar
./config.properties以上提交命令制定的配置文件,为什么在容器内找配置文件?file
/home/yarn/nm/usercache/root/appcache/application_1690773368385_0092/container_e183_1690773368385_0092_01_01/./config.properties
does
*退订*
您好
我想知道是否有代码示例,可以在Flink程序中加载预先训练好的编码模型(用python编写)
我们要将当前在Hadoop Yarn上运行的flink
sql作业迁移到K8S上,状态存储介质要从HDFS更换到对象存储,以便作业能够从之前保存点恢复,升级对用户无感。
又因为flink作业状态文件内容中包含有绝对路径,所以不能通过物理直接复制文件的办法实现。
查了一下官网flink state processor api目前读取状态需要传参uid和flink状态类型,但问题是flink
sql作业的uid是自动生成的,状态类型我们也无法得知,请问有没有遍历目录下保存的所有状态并将其另存到另一个文件系统目录下的API ? 感觉state
processor
Hi,
如题,请教一下关于如何使用DataStream API实现有界流的join操作,我在调用join的时候必须要window,怎么避免,还是需要使用SQL
API才可以
感谢,
鱼
这个解决不了根本问题 主要是我们的任务比较多,业务上就需要保留几千个任务
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2023年07月28日 11:28,Shammon FY 写道:
Hi,
可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1]
[1]
Hi
Flink UI 需要加载所有的 Job 信息并在 UI 渲染,在作业比较多的时候很容易导致 UI 卡死。
不只在这个页面,在一些并发比较大的任务上打开 subtask 页面也很容易导致UI 卡死。
Flink UI 需要一个分页的功能来减少数据加载和 UI 渲染的压力
Best,
Weihua
On Fri, Jul 28, 2023 at 11:29 AM Shammon FY wrote:
> Hi,
>
>
> 可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1]
Hi,
可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1]
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-jobmanager-options
Best,
Shammon FY
On Fri, Jul 28, 2023 at 10:17 AM 阿华田 wrote:
> 目前flink-job-history
>
目前flink-job-history 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式?
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
退订
On Thu, Jul 27, 2023 at 2:40 PM 许琦 wrote:
> *退订*
*退订*
你好,
通常在 pom 中引入 maven-deploy-plugin,并且通过 声明私服地址,使用 mvn
clean deploy 命令部署到nexus私服。
部署到 SNAPSHOT 仓库需要项目版本号包含 -SNAPSHOT 后缀,可以在IDE中全局替换,也可以使用 versions-maven-plugin
统一设置。
org.apache.maven.plugins
maven-deploy-plugin
2.8.2
我是基于flink 1.16.0开发的,由于版本号没有snapshot,现在又无法发布release版本的,我该怎么办?
好的,谢谢老师
在 2023-07-26 21:04:20,"Jiabao Sun" 写道:
>SqlSession 需要关闭,建议使用 SqlSessionManager,可以不用手动关闭 SqlSession。
>
>
>On 2023/07/18 02:13:16 lxk wrote:
>> 在flink内需要使用mybatis做些简化查询的工作,目前我的使用方式如下
>>
>> public class MybatisUtil {
>>
>> private static final Logger LOGGER =
>>
SqlSession 需要关闭,建议使用 SqlSessionManager,可以不用手动关闭 SqlSession。
On 2023/07/18 02:13:16 lxk wrote:
> 在flink内需要使用mybatis做些简化查询的工作,目前我的使用方式如下
>
> public class MybatisUtil {
>
> private static final Logger LOGGER =
> LogFactory.createNewLogger("MybatisUtil");
> private static ThreadLocal tl = new
退订
| |
lei-tian
|
|
totorobabyf...@163.com
|
你好,感谢回复。我使用reduce解决了问题。
祝好运。
发件人: weijie guo
发送时间: 2023年7月26日 10:50
收件人: user-zh@flink.apache.org
主题: Re: 关于DataStream API计算批数据的聚合值
你好:
Batch 模式下的 reduce 操作默认应该就是只输出最后一条数据(per-key)的。Agg 的话可能有点麻烦,可以使用
GlobalWindow + 自定义 Trigger 来 Workaround.
Best regards,
退订
wang <24248...@163.com> 于2023年7月13日周四 07:34写道:
> 退订
--
Best Regards,
*Yaohua Wang 王耀华*
School of Software Technology, Xiamen University
Tel: (+86)187-0189-5935
E-mail: wangyaohua2...@gmail.com
*退订*
你好,感谢老师回复
`insert into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,我理解这两种情况的话,对于数据库来说,就是一次插入与两次插入的问题了吧,要是数据量大的话,感觉对性能还是有影响的
| |
小昌同学
|
|
ccc0606fight...@163.com
|
回复的原邮件
| 发件人 | Shammon FY |
| 发送日期 |
Hi,
目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc
server,这样会节省网络IO。
具体到数据库侧,我理解执行 `insert into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
你好:
Batch 模式下的 reduce 操作默认应该就是只输出最后一条数据(per-key)的。Agg 的话可能有点麻烦,可以使用
GlobalWindow + 自定义 Trigger 来 Workaround.
Best regards,
Weijie
Liu Join 于2023年7月26日周三 09:10写道:
> 例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值
>
Hi,
跟使用普通流式作业的DataStream用法一样,只需要在RuntimeMode里使用Batch模式,Flink在Batch模式下会只输出最后的结果,而不会输出中间结果。具体可以参考Flink里的WordCount例子
[1]
[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
On Wed, Jul
例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值
各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师:
我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式
是
insert into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10);
或者是
相关配置:
Flink:1.16
| Checkpointing Mode | Exactly Once |
| Checkpoint Storage | FileSystemCheckpointStorage |
| State Backend | EmbeddedRocksDBStateBackend |
| Interval | 8m 0s |
我有一个程序,主要是用来统计一些热门商品之类的数据
具体代码如下:
.keyBy(data -> data.getShopId() + data.getYh_productid())
Hi, Jiacheng:
helm也是把values.yaml和你在命令行中传入的参数,代入到一些k8s的yaml模版里,render出来一个yaml文件,然后提交到k8s的。
这些是flink kubernetes operator的helm的模版。
https://github.com/apache/flink-kubernetes-operator/tree/main/helm/flink-kubernetes-operator/templates
你完全可以自己人肉修改这些yaml模版,然后提交到k8s,当然这很麻烦,也很容易出错。
Hi,
运行的是哪个例子?从错误上看是在从MiniCluster获取结果的时候,MiniCluster被关闭了
Best,
Shammon FY
On Sat, Jul 22, 2023 at 3:25 PM guanyq wrote:
> 本地IDEA运行 MiniCluster is not yet running or has already been shut down.
> 请问是什么原因,如何处理
>
>
>
>
> 15:19:27,511 INFO
>
退订
请问没有helm的情况下能否安装flink-k8s-operator?安装operator是否可以不要clusterrole
本地IDEA运行 MiniCluster is not yet running or has already been shut down.
请问是什么原因,如何处理
15:19:27,511 INFO
org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] -
Stopping resource manager service.
15:19:27,503 WARN
请问,flink sql 能否通过sql语句将mysql表加载为flink 内存表
sql语句为多表关联
退订
代码如下:
package com.didichuxing.iov.func
import com.didichuxing.iov.util.JedisTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.async.{ResultFuture,
RichAsyncFunction}
import org.slf4j.LoggerFactory
import redis.clients.jedis.Jedis
import
我也遇到类似的问题,我是链接失效了,最后没办法再注册了
> 2023年7月20日 14:54,李天龙 写道:
>
> 您好!
> 我想注册一个flink jira的账号,但由于提出的里有不充分给拒掉了,想再次申请,却提示邮箱已申请过,还未处理:
>
>
> There is already a pending Jira account request associated with this email
> address. Please wait for it to be processed
>
>
> 请问怎么解决这个问题,并且成功申请一个账号
>
>
>
>
您好!
我想注册一个flink jira的账号,但由于提出的里有不充分给拒掉了,想再次申请,却提示邮箱已申请过,还未处理:
There is already a pending Jira account request associated with this email
address. Please wait for it to be processed
请问怎么解决这个问题,并且成功申请一个账号
--
发自我的网易邮箱平板适配版
异常栈信息
```
2023-07-20 11:43:01,627 ERROR
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Terminating
TaskManagerRunner with exit code 1.
org.apache.flink.util.FlinkException: Failed to start the TaskManagerRunner.
at
flink 1.17.1 on Yarn实时任务运行了几天出现了Yarn
token过期问题,在1.12未出现。这块具体有什么变化嘛,我是否还需要再配置其他参数。
具体配置:
```
security.kerberos.access.hadoopFileSystems: viewfs://AutoLfCluster;hdfs://ns1
security.kerberos.login.keytab: /xxx/krb5.keytab
security.kerberos.login.principal: flink/xxx
Hi casel
之前有类似的讨论, 不过暴露 ROWKIND 之后可能可以会造成 SQL 语义上的不明确,你可以在 dev 邮件在发起讨论看看,看看大家的想法。
https://issues.apache.org/jira/browse/FLINK-24547
Best,
Feng
On Wed, Jul 19, 2023 at 12:06 AM casel.chen wrote:
> 社区无人响应吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-07-15 12:19:46,"casel.chen" 写道:
>
社区无人响应吗?
在 2023-07-15 12:19:46,"casel.chen" 写道:
>Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka
>connector中的offset和partition等,用户可以使用这些ROWKIND
>metadata进行处理,例如对特定ROWKIND过滤或者答输出数据中添加ROWKIND字段
退订
在flink内需要使用mybatis做些简化查询的工作,目前我的使用方式如下
public class MybatisUtil {
private static final Logger LOGGER = LogFactory.createNewLogger("MybatisUtil");
private static ThreadLocal tl = new ThreadLocal();
private static SqlSessionFactory factory = null;
//private static SqlSession
Hi, 在1.15之前,一般是通过history server[1]去拿到最终状态,在1.15之后可以设置这两个Experimental参数
execution.shutdown-on-application-finish[2]
execution.submit-failed-job-on-application-error[3]
设置两个参数的前提条件是必须开启了JobManager的HA[4]
[1]:
history server是不是有延迟性 做不到实时获取任务的状态
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2023年07月15日 12:14,casel.chen 写道:
可以查看history server
在 2023-07-14 18:36:42,"阿华田" 写道:
hello 各位大佬, flink on K8S ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了
无法判断flink任务是正常Finished
Hi,
根据上面的异常栈信息,你可以检查一下是否配置了cluster id,在yarn里配置项是`yarn.application.id`
Best,
Shammon FY
On Sat, Jul 15, 2023 at 6:50 PM 杨东树 wrote:
> 您好,
>针对sql-client运行在yarn-session模式报错,现补充相关日志报错信息:
> 2023-07-15 18:43:21,503 INFO org.apache.flink.table.client.cli.CliClient
> [] -
您好,
针对sql-client运行在yarn-session模式报错,现补充相关日志报错信息:
2023-07-15 18:43:21,503 INFO org.apache.flink.table.client.cli.CliClient
[] - Command history file path: /root/.flink-sql-history
2023-07-15 18:43:28,225 INFO org.apache.flink.table.catalog.CatalogManager
Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka
connector中的offset和partition等,用户可以使用这些ROWKIND
metadata进行处理,例如对特定ROWKIND过滤或者答输出数据中添加ROWKIND字段
可以查看history server
在 2023-07-14 18:36:42,"阿华田" 写道:
>
>
>hello 各位大佬, flink on K8S ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了
>无法判断flink任务是正常Finished 还是异常失败了,各位大佬有什么建议吗
>| |
>阿华田
>|
>|
>a15733178...@163.com
>|
>签名由网易邮箱大师定制
>
hello 各位大佬, flink on K8S ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了
无法判断flink任务是正常Finished 还是异常失败了,各位大佬有什么建议吗
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
Hi,
我觉得增加到3分钟可能不是一个合适的方法,这会增加作业恢复时间。建议还是追查一下为什么上游task这么长时间没有部署启动成功比较好。
Best,
Shammon FY
On Fri, Jul 14, 2023 at 2:25 PM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:
> hi, 上次将`taskmanager.network.request-backoff.max` 从默认的10s增加到30s后 跑了5天还是出现
> PartitionNotFoundException循环重启
>
Hi, Community. There was an issue that happened to one of our Flink Streaming
jobs using 1.14.3 and that job didn't enable JobManager HA. The issue is after
the only jobManager pod's flink-main-container restarted, some of the
taskManager pods keep throwing the below exception:
INFO
退订
嗨你好,用于sort的中间数据是存储在状态后端当中吗,数据量很大的情况下。
> 2023年7月12日 19:48,weijie guo 写道:
>
> 你好,
> 首先,Batch Shuffle 的中间数据都是会落盘的。其次,对于 Sort 这个操作来说,上面给出的解法和Dataset一致,都不会落盘。
>
> Best regards,
>
> Weijie
>
>
> jinzhuguang 于2023年7月12日周三 17:28写道:
>
>> 如果我的数据量很大,内存装不下,flink在batch
>>
你好,
首先,Batch Shuffle 的中间数据都是会落盘的。其次,对于 Sort 这个操作来说,上面给出的解法和Dataset一致,都不会落盘。
Best regards,
Weijie
jinzhuguang 于2023年7月12日周三 17:28写道:
> 如果我的数据量很大,内存装不下,flink在batch
> mode下的行为是否会像传统的批处理系统,例如hive那样,会进行shuffe、中间数据落盘等操作。
>
> > 2023年7月12日 17:05,weijie guo 写道:
> >
> >
>
如果我的数据量很大,内存装不下,flink在batch mode下的行为是否会像传统的批处理系统,例如hive那样,会进行shuffe、中间数据落盘等操作。
> 2023年7月12日 17:05,weijie guo 写道:
>
> 你好,对于DataSet中不按照key进行全量聚合/排序的API(例如,sortPartition/mapPartition),DataStream上目前没有直接提供相同的API,但可以通过组合DataStream上现有的API实现相同的功能。
> 以mapPartition为例,可以通过以下三个步骤实现相同的功能:
> 1.
你好,对于DataSet中不按照key进行全量聚合/排序的API(例如,sortPartition/mapPartition),DataStream上目前没有直接提供相同的API,但可以通过组合DataStream上现有的API实现相同的功能。
以mapPartition为例,可以通过以下三个步骤实现相同的功能:
1. dataStream.map(record -> (subtaskIndex,
record)),为每个Record增加处理该record时子任务编号。
2.
Hello:
请教2个问题。
1、flink 使用sql-client.sh -f xx.sql 怎么传递参数修改sql里面的文件。比如MySQL,Kafka的连接地址。
2、flink sql消费Kafka
设置group-offset,group.id之前没提交过,会直接报错。怎么设置成没提交过从earliest消费等等。
感谢大家
Flink 社区在这个 thread 讨论了这个问题,之后会出一个 guideline 来帮助用户迁移 DataSet API
[1] https://lists.apache.org/thread/r0y9syc6k5nmcxvnd0hj33htdpdj9k6m
Best regards,
Yuxia
- 原始邮件 -
发件人: "jinzhuguang"
收件人: "user-zh"
发送时间: 星期二, 2023年 7 月 11日 下午 7:16:06
主题: 如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?
作业已重启 其他日志暂时没有了
在 2023-07-12 11:06:31,"Shammon FY" 写道:
>Hi
>
>你可以贴一下完整的异常栈信息,这可以帮助定位具体问题
>
>Best,
>Shammon FY
>
>
>On Wed, Jul 12, 2023 at 10:52 AM chenyu_opensource <
>chenyu_opensou...@163.com> wrote:
>
>> 目前是用flink1.12版本,从kafka中读取数据到hdfs,前期运行正常,过段时间报错:
>> Caused by:
Hi
你可以贴一下完整的异常栈信息,这可以帮助定位具体问题
Best,
Shammon FY
On Wed, Jul 12, 2023 at 10:52 AM chenyu_opensource <
chenyu_opensou...@163.com> wrote:
> 目前是用flink1.12版本,从kafka中读取数据到hdfs,前期运行正常,过段时间报错:
> Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
>
目前是用flink1.12版本,从kafka中读取数据到hdfs,前期运行正常,过段时间报错:
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
org.apache.hadoop.ipc.RemoteException(java.io.IOException):
BP-504689274-10.204.4.58-1507792652938:blk_3265799450_2192171234 does not exist
or is not under Constructionnull
flink
Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka connector中的offset,
partition,用户可以引用这些metadata进行过滤操作?
在 2023-07-10 23:39:00,"yh z" 写道:
>Hi, shi franke. 你可以尝试自己实现一个 DynamicTableSink,在里面添加参数 “sink.ignore-delete”。
>你可以参考 github 上的一些实现,例如 clickhouse:
目前,我在flink 1.16还没有发现DataStream有实现分区排序的接口,现在是否有方法在有界数据集上实现分区、排序这个需求呢?
目前,我在flink 1.16还没有发现DataStream有实现分区排序的接口,现在是否有方法在有界数据集上实现分区、排序这个需求呢?
你好,整个程序有反压吗
在 2023-07-10 15:32:44,"jiaot...@mail.jj.cn" 写道:
>Hello,
> 我定义了一个pattern (a->b->c->d->e->f->g)在10分钟内匹配,通过在WebUI上查看任务很快在cep节点
> busy(max)100%,我发现通过增加cep节点的并发度并不能解决问题,且checkpoint随着时间的推移状态大小越来越大,数据应该存在大量堆积。数据源同时消费4个kafka
> topic
>
Hi, shi franke. 你可以尝试自己实现一个 DynamicTableSink,在里面添加参数 “sink.ignore-delete”。
你可以参考 github 上的一些实现,例如 clickhouse:
https://github.com/liekkassmile/flink-connector-clickhouse-1.13
shi franke 于2023年7月7日周五 19:24写道:
>
>
退订
Hi,
邮件里的图片看不到
Best,
Shammon FY
On Sun, Jul 9, 2023 at 7:30 PM 杨东树 wrote:
> 各位好,
>目前我在使用flink1.14.5版本的sql-client on
> yarn-session模式时,发现无法正常执行sql任务,日志报如下错误,希望能得到指导,谢谢:
>背景信息:
>1、当flink配置execution.target:
> yarn-per-job时,随后进入sql-client执行sql任务,可正常执行。
>
各位好,
目前我在使用flink1.14.5版本的sql-client on
yarn-session模式时,发现无法正常执行sql任务,日志报如下错误,希望能得到指导,谢谢:
背景信息:
1、当flink配置execution.target: yarn-per-job时,随后进入sql-client执行sql任务,可正常执行。
2、当flink配置execution.target: yarn-session,并启动flink
yarn-session集群,随后进入sql-client执行同样的sql任务,报上图中的错误。
感谢您的回复,这样自定义是可以实现的,我们目前使用的是1.15的flink版本。想看一下社区是不是有在框架层面实现这个配置的支持,理解这应该也是一个相对common的配置
junjie.m...@goupwith.com 于2023年7月7日周五 17:57写道:
> 可以自己用DataStream API通过RowKind进行过滤。
> 如下示例代码:import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.types.Row;
> import
咨询下各位大佬,请问下connector现在有支持忽略delete消息的选项配置
吗?比如上游的数据是一个upsert/retract流,在connector这里是否有选项去忽略delete
message,当作append流只去戳里insert消息。我看现在代码没有类似的功能,不确定是否有相关的jira或者实现
您好:
我在尝试使用coGroup对两条流进行连接,使用的依赖版本是flink-streaming-java:1.16.1,流连接的代码如下:
DataStream dataStream3=
dataStream1.coGroup(dataStream2).where(Data1::getKey).equalTo(Data2::getKey)
.window(TumblingEventTimeWindows.of(Time.seconds(5*60)))//窗口大小5min
好的,非常感谢,我试试用 Hint 来控制 join 类型。
发件人: yh z
发送时间: 2023年7月6日 12:10
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.16 流表 join 的 FilterPushDown 及并行
Hi, Chai Kelun, 你的 filter condition 里面包含了你自定义的 UDF,是不满足 filter push down
的条件的,因为对于优化器来说 UDF 是不确定的,优化器不能从里面提取到可以下推的条件,
Hi, aiden. 看起来是类冲突,按照官方的文档,使用 kafka 时,你应该是不需要引入 flink-core 和
flink-connector-base 的(
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/)。如果是因为其他原因需要使用这两个
jar, 你可以使用 mvn dependency::tree 查看一下
"org/apache/kafka/clients/consumer/ConsumerRecord"
Hi, Chai Kelun, 你的 filter condition 里面包含了你自定义的 UDF,是不满足 filter push down
的条件的,因为对于优化器来说 UDF 是不确定的,优化器不能从里面提取到可以下推的条件, 如果你想实现下推,可以尝试抽取下确定性的
condition,如 product.id > 10 etc.。另外,Flink 是支持 broadcast hash join
的,如果你想控制某两个表的 join type,你可以通过 join hint 来指定 join 类型为 broadcast。()
Chai Kelun 于2023年7月3日周一
Hi,
我们的做法是启动Flink集群后,在其他节点(pod或者独立启动)启动Sql-Gateway,通过Flink的地址远程连接Flink集群,这样Sql-Gateway的部署和Flink集群完全分开
Best,
Shammon FY
On Tue, Jul 4, 2023 at 10:52 AM chaojianok wrote:
> 大家好,请教个问题。
>
> 用native kubernetes方式在k8s集群上部署好了flink,现在需要在这个flink集群里使用flink sql
> gateway,大家有什么好的方案吗?
> 目前的做法是,进入pod里启动sql
Hi,
如果要增加request
partition的重试时间,可以调整配置项`taskmanager.network.request-backoff.max`,默认是10秒,具体配置可以参阅[1]
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-taskmanageroptions
Best,
Shammon FY
On Tue, Jul 4, 2023 at 11:38 AM zhan...@eastcom-sw.com <
Hi,
PartitionNotFoundException异常原因通常是下游task向上游task发送partition
request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。
Best,
Shammon FY
On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:
>
> 异常日志内容
>
> 2023-07-03 20:30:15,164
退订
有一张 kafka 流表 logClient(id int, name string, price double),一张实现了
SupportsFilterPushDown 的 customConnector 维表 product(id int, name string, value
double),实现了自定义函数 MyUDF(double d1, double d2) 用于自定义逻辑计算并支持该算子的下推。
在 Stream-Table Join 的场景下,下列 SQL 并没有将算子进行下推,而是通过 TableScan 将所有算子提到 Join
Hi
可以的,DataStream有很多内置的数据类型,也支持自定义数据类型和数据的序列化反序列化,然后在DataStream的计算内对数据执行计算,可以参考DataStream的官方文档[1]和数据类型文档[2]
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/
[2]
请教下,如果用flink进行批计算,使用DataStream API有没有什么优化的地方,是否可以直接将数据作为矩阵在算子之间传递进行计算
mysql库中设置的是utf8mb4编码,单独sql查询mysql表没有出现中文乱码
使用flink datastream作业通过cdc消费mysql
binlog并写到下游doris表时遇到字符串长度超长问题,我们是按mysql表schema创建的doris
schema,就很奇怪为什么总是报字符串超长错误。于是将异常时的原始数据打印出来,才发现数据中只要包含了中文字符都会显示成乱码,要么都是???,要么都是其他莫名字符。
没有人遇到过这个问题吗?
在 2023-06-19 10:41:30,"casel.chen" 写道:
>Flink作业消费数据源来自mysql业务表,后者使用了`-00-00 00:00:00`这个dummy date来存时间,直接用Flink
>CDC消费的话会被解析成`1970-01-01 00:00:00` (mysql中是datetime类型)或者`1970-01-01 08:00:00`
>(mysql中是timestamp类型)。
>问题1:可否给个Flink
感谢姜老师回复,请问如何使用OnlineLogiticRegression生成连续模型表,有样例程序吗
At 2023-06-10 19:18:37, "姜鑫" wrote:
>Hi Xun,
>
>The OnlineLogisticRegression model data is represented as Flink Tables. A
>Flink table can be unbounded and contains continuous model data, so the way to
>update model is just insert new model
我排查了一下,因为任务其实是跑在本地模式上,而我一直没有配置本地模式的slot数量导致slot不足,而这个1024G其实是一个默认值所以出现了需求1T内存这种奇怪的报错。
以往没有出现这种问题是因为以前本地模式会自动分配足够的slot,但flink doris
connecter由于未知的原因没有被计入slot需求中,这就导致缺少一个slot无法达到需求。
> 2023年6月19日 16:18,郭欣瑞 写道:
>
> 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
>
>
退订
在 2023-06-20 11:16:18,"Yanfei Lei" 写道:
>Hi,
>
>从 ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes),
>taskOffHeapMemory=1024.000gb (1099511627776 bytes),
>managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb
>(67108864 bytes)}, numberOfRequiredSlots=1}]
Hi,
从 ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes),
taskOffHeapMemory=1024.000gb (1099511627776 bytes),
managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb
(67108864 bytes)}, numberOfRequiredSlots=1}] 来看,sink节点想申请 1T的 heap
memory 和 1T的 off heap
Hi,
这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置
Best,
Shammon FY
On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞 wrote:
> 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
>
> DeclarativeSlotPoolBridge.java:351 - Could not acquire the minimum
> required
我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
DeclarativeSlotPoolBridge.java:351 - Could not acquire the minimum required
resources, failing slot requests. Acquired:
[ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
(1099511627776 bytes),
共有 16769 项搜索結果,以下是第 601 - 700 matches
Mail list logo