请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式
select
date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time,
b.vehicle_code,
a.item_name,
a.item_value,
c.item_value as vehicle_score,
d.current_fault,
/opt/flink/flink-1.17.1/bin/flink run-application -t yarn-application -yjm
1024m -ytm 1024m ./xx-1.0.jar
./config.properties以上提交命令制定的配置文件,为什么在容器内找配置文件?file
/home/yarn/nm/usercache/root/appcache/application_1690773368385_0092/container_e183_1690773368385_0092_01_01/./config.properties
does
本地IDEA运行 MiniCluster is not yet running or has already been shut down.
请问是什么原因,如何处理
15:19:27,511 INFO
org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] -
Stopping resource manager service.
15:19:27,503 WARN
请问,flink sql 能否通过sql语句将mysql表加载为flink 内存表
sql语句为多表关联
请问下
flink如何监控并实时读取远程服务器的日志目录中所有日志文件内容
日志服务器(可以ssh连接,IP/用户名/密码)
41写道:
>
>> Hi
>>
>>
>> 如果要启动hiveserver2协议的gateway,需要将jar包flink-connector-hive_${scala.binary.version}放入到gateway的lib目录
>>
>> Best,
>> Shammon FY
>>
>>
>> On Sun, Mar 26, 2023 at 12:07 PM guanyq wrote:
>>
>> > 本地启动了flink及hive在启动
本地启动了flink及hive在启动sql gateway时有以下异常,请问还需要其他什么操作么
./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.type=hiveserver2
-Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/usr/local/app/apache-hive-3.1.2-bin/conf
异常信息
Available factory identifiers are:
rest
at
10:16:48,"Guojun Li" 写道:
>Hi
>
>确认一下这些 ha 文件的 last modification time 是一致的还是错开的?
>
>另外,指定 chk- 恢复尝试了没有?可以恢复吗?
>
>Best,
>Guojun
>
>On Fri, Mar 10, 2023 at 11:56 AM guanyq wrote:
>
>> flink ha路径为 /tmp/flink/ha/
>> flink chk路径为 /tmp/flink/checkp
ployment/config/#yarn-application-attempt-failures-validity-interval
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 4:27 PM guanyq wrote:
>
>> 图片在附件
>> 但是实际却是超过了10次。。
>>
>>
>>
>>
>>
>>
>> 在 2023-03-13 15:39:39,"Weihua Hu&
图片在附件
但是实际却是超过了10次。。
在 2023-03-13 15:39:39,"Weihua Hu" 写道:
>Hi,
>
>图片看不到了
>
>按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 3:32 PM guanyq wrote:
>
>> flink1.10版本,flink配置如下
>> yarn.app
flink1.10版本,flink配置如下
yarn.application-attempts = 10 (yarn尝试启动flink job的次数为10)
正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
请问appattempt_1678102326043_0006_000409每个序号不是代表一次尝试么
:
>Hi
>
>一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)
>
>有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
>以及最终尝试从哪一次 cp 恢复的。
>
>也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复
>
>
>Best,
>Weihua
>
>
>On Fri, Mar 10, 2
kpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>是观察到checkpoint dir下面没有文件吗?
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
>
>guanyq 于2023年3月10日周五 08:58写道:
>>
>> 目前也想着用savepoint处理异常停电的问题
&
gt;作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>
>Best,
>Shammon
>
>On Thu, Mar 9, 2023 at 10:06 PM guanyq wrote:
>
>> 前提
>> 1.flink配置了高可用
>> 2.flink配置checkpoint数为10
>> 3.yarn集群配置了任务恢复
>> 疑问
>> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>>
>>
>>
>>
前提
1.flink配置了高可用
2.flink配置checkpoint数为10
3.yarn集群配置了任务恢复
疑问
yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
请问下 flink executeAsync() 一般都什么使用场景
无限流情况下executeAsync() / execute()是一样的吧
主要是想问下 场景/区别
请问flink每个版本的新特性在哪里有介绍.
咨询下各位大佬
flink sql在做批处理时,生产环境一般都用什么来做定时调度?
如果存在job之间的依赖,生产环境是又是采用什么来做通知的?
我这面主要是想把hive sql 修改为 flink sql
看了FFA的分享(流批一体) Flink1.15版本推出 MVP版本,动态表存储的流批一体
请问MVP版本是收费版么?
kafka实时流关联hive的最新分区表数据时,关于缓存刷新的问题
'streaming-source.monitor-interval'='12 h'
这个参数我理解是:按照启动开始时间算起,每12小时读取一下最新分区的数据是吧?
还有个问题是读取最新分区的时间间隔之间,实时流里面进入了预关联新分区的数据,那么是不是就相当于关联的还是上一次的最新分区数据吧?
哪位大佬分享下temporal join hive的demo,
参考下
我这面本地一直有问题。
请问下:
我记得之前的版本有redis connector,但是为什么现在版本的官网里面没有redis connector了
请大佬指导下:
需求: 通过flink sql 统计每天各个省份的订单受理量,显然这种维度统计时递增,如何设置ttl,只想让维度存储1周的数据。
维度递增很可能会导致内存溢出,请教下flink sql ttl 配置在官网哪里有说明么。
请大佬指导下:
table api grooupbywindow,为什么输出是Empty set。
public class GroupByWindow {
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
List lst
请大佬指导下:
table api grooupbywindow,为什么输出是Empty set。
请大佬指导下:
flink streaming可以指定partition消费kafka么
如有100个partition,但是我只想消费15partiton。
请大佬指导下:
-- 在 Flink SQL 中注册 HBase 表 "mytable"
CREATETABLEhTable(rowkeyINT,family1ROW,family2ROW,family3ROW,PRIMARYKEY(rowkey)NOTENFORCED)WITH('connector'='hbase-1.4','table-name'='mytable','zookeeper.quorum'='localhost:2181');
Flink sql在读取hbase表时,是一次将数据加载到内存还是每次加载一批数据呀?
请大佬指导下:
动态表
时态表
版本表
这三个是什么关系?
flinksql里面还有其他一些表的概念么?
flink on yarn 在集群中启动很多的task,生产应用中是如何监控task的日志,和checkpoint的呢?
求大佬指导。
flink 1.12版本
kafka版本0.11版本
目前可以消费,但是偏移量无法提交到kafka
我试过相同的代码,kafka版本2.4.1就可以提交偏移量到kafka
目前kafka 0.11版本有问题。无法提交。
有没有大佬帮忙想想办法。如何解决这个版本问题。
kafka版本0.11
目前查看消费组的解压情况,报消费组不存在。
TimeUnit;
import static org.apache.flink.table.api.Expressions.*;
/**
* æ°å¢
*
* @author guanyq
* @date 2021/2/24
*/
public class DistinctAggregation3 {
public static void main(String[] args) throws Exception {
// get env
final StreamExecutionEnvironment env =
StreamExecutionEnvironment
求一个Flink1.12.0版本,Versioned Tables的demo。
CREATETABLEproducts(product_idSTRING,product_nameSTRING,priceDECIMAL(32,2),update_timeTIMESTAMP(3)METADATAFROM'value.source.timestamp'VIRTUAL,PRIMARYKEY(product_id)NOTENFORCEDWATERMARKFORupdate_timeASupdate_time)WITH(...);
./bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
./bin/flink savepoint fea3d87f138ef4c260ffe9324acc0e51 [:targetDirectory]
application_1610788069646_0021
[:targetDirectory]
hdfs:///flink/savepoints
在 2021-01-21 10:24:31,"刘海" 写道:
>Hi
>
请问下如何选择kafka connector的版本
如果选择1.12.0版本,就没有FlinkKafkaProducer09/FlinkKafkaConsumer09
org.apache.flink
flink-connector-kafka_${scala.binary.version}
看错误是与hadoop-common-2.7.4.jar冲突,但是不知道如何解决。
help
2021-01-1915:12:47,922ERRORorg.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
[] - Fatal error occurred in ResourceManager.
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Could not start the
各位大佬。help
flink1.9.2版本升级到1.12.0版本
flink on yarn部署
异常日志如下
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not deploy Yarn job
flink1.9读取阿里RocketMQ
如何设置AccessKey,SecretKey 参数
finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)build();
附件图片,想把listener出来的数据,传给ctx。
如何实现这个数据的传递。
public class RMQRichParallelSource extends RichParallelSourceFunction
implements MessageOrderListener {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Properties properties = new Properties();
//
问题1
./bin/flink run -m
yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
当yarn application -kill application_1567067657620_0254后,
在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
问题2
./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
附件为错误日志。哪位大佬帮忙分析下。2020-06-20 08:39:47,829 INFO org.apache.flink.yarn.YarnTaskExecutorRunner
-
2020-06-20 08:39:47,830 INFO org.apache.flink.yarn.YarnTaskExecutorRunner
-
非常感谢,问题解决了!
在 2020-06-09 08:27:47,"马阳阳" 写道:
>
>
>
>我们也遇到过这个问题,我们当时遇到的问题是YARN NM上的默认charset是ascii。
>通过在flink-conf.yaml文件里添加如下配置解决了该问题:
>env.java.opts.taskmanager: "-Dfile.encoding=UTF-8"
>
>
>
>
>
>
>
>
>
>
&
kafka 0.11版本
首先kafka source topic数据是正常的,kafka客户端消费出来无中文乱码问题
1.本地idea debug运行,无中文乱码问题
2.服务器Standalone模式运行,无中文乱码问题
3.服务器on yarn提交方式,就出现中文乱码问题
flink 消费kafka的api用的是这个
new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props);
根据1,2,3分析问题可能和yarn有关系。请教一下大佬们,还需要怎么调查,才能解决这个问题。
请教下大佬们,想知道flink1.9.0版本对应pushgateway和prometheus的版本号分别都是多少。
附件是错误日志
我感觉看到错误日志之后,没有什么调查方向,应该怎么调查呢。2020-06-04 13:19:15,590 INFO
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction -
FlinkKafkaProducer 7/12 - checkpoint 3425 complete, committing transaction
TransactionHolder{handle=KafkaTransactionState [transactionalId=null,
can be specified by topic name and partition id. | Gauge |
| Operator | currentOffsets | topic, partition | The consumer's current read
offset, for each partition. A particular partition's metric can be specified by
topic name and partition id. | Gauge |
在 2020-06-03 15:02:24,"g
>发送时间: 2020年6月3日(星期三) 下午2:29
>收件人: "user-zh"抄送: "user-zh"主题: 回复:flink1.9,如何实时查看kafka消费的挤压量
>
>
>
>一般是kafka自带的查看消费组的命令工具可以看
>./kafka-consumer-groups.sh --describe --group test-consumer-group
>--bootstrap-server
>
>
>| |
>Zhonghan Tang
>|
>|
>13122260...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2020年06月3日 14:10,guanyq请加个问题
>
>1.消费kafka时,是如何实时查看kafka topic的挤压量的?
请加个问题
1.消费kafka时,是如何实时查看kafka topic的挤压量的?
>> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;
-- 能粘贴下代码么
-- 还有提交的命令
>> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上;
-- 什么模式提交的job(yarn session,yarn,还是stand alone模式)
在 2020-05-25 11:47:48,"tison" 写道:
flink版本:1.9
stateBackEnd:FsStateBackEnd
附件:
checkpoint失败截图及日志,应该如何分析解决这种失败?2020-05-10 21:22:09,336 INFO org.apache.flink.yarn.YarnTaskExecutorRunner
-
2020-05-10 21:22:09,337 INFO
flink版本:1.9
stateBackEnd:FsStateBackEnd
附件:
checkpoint失败截图及日志,应该如何分析解决这种失败?
2020-05-10 21:22:09,336 INFO org.apache.flink.yarn.YarnTaskExecutorRunner
-
2020-05-10 21:22:09,337 INFO
附件为日志,麻烦帮忙分析一下。
2020-05-10 21:22:09,336 INFO org.apache.flink.yarn.YarnTaskExecutorRunner
-
2020-05-10 21:22:09,337 INFO org.apache.flink.yarn.YarnTaskExecutorRunner
- Starting
定位到问题点了。
和这个keyBy有关,后一种可以读取MapState,前一种报错
我自己又实现了一个MapState的存储job,用同样的代码是可以读取出所有MapState的key的。
在 2020-04-30 15:23:39,"guanyq" 写道:
>public class MyKeyedMapState {public String key;public
>ListString value;public MyKeyedMapState() {
>}public String getKey() {return key;
>}public
;}public void setValue(ListString
value) {this.value = value;}}改成这个也还是报同样的错误!
在 2020-04-30 15:19:18,"shao.hongxiao" <17611022...@163.com> 写道:
>
>
>用一个正常的Java pojo试一下
>| |
>邵红晓
>|
>|
>邮箱:17611022...@163.com
>|
>签名由网易邮箱大师
有没有发现,我这还是报错。
在 2020-04-30 09:40:45,"shx" <17611022...@163.com> 写道:
>能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢谢
>
>
>
>
>| |
>邵红晓
>|
>|
>邮箱:17611022...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2020年04月30日 09:04
还有其他可能的原因么。
在 2020-04-30 10:25:32,"guanyq" 写道:
附件是代码
还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗
-- 代码是读出所有map状态的key。
在 2020-04-30 09:40:45,"shx" <17611022...@163.com> 写道:
>能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢
gt;签名由网易邮箱大师定制
>在2020年4月30日 09:40,shx<17611022...@163.com> 写道:
>能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢谢
>
>
>
>
>| |
>邵红晓
>|
>|
>邮箱:17611022...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2020年04月30日 09:04,guanyq 写道:
>代码中没特别指定Seri
附件是代码
还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗
-- 代码是读出所有map状态的key。
在 2020-04-30 09:40:45,"shx" <17611022...@163.com> 写道:
>能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢谢
>
>
>
>
>| |
>邵红晓
>|
>|
>邮箱:17611022...@163.com
&g
s-release-1.10/dev/stream/state/schema_evolution.html
>
>Best,
>Congxian
>
>
>guanyq 于2020年4月29日周三 下午6:09写道:
>
>>
>> 附件是代码和错误日志。目前不知道如何调查。麻烦帮忙看下 谢谢。
ok 找到原因了!不好意思!
在 2020-04-16 08:03:29,"guanyq" 写道:
>代码里面是有env.execute,提交job出现以下错误,可能时什么原因?
>The program didn't contain a Flink job. Perhaps you forgot to call execute()
>on the execution environment.
ok 找到原因了!不好意思!
在 2020-04-16 08:03:29,"guanyq" 写道:
>代码里面是有env.execute,提交job出现以下错误,可能时什么原因?
>The program didn't contain a Flink job. Perhaps you forgot to call execute()
>on the execution environment.
代码里面是有env.execute,提交job出现以下错误,可能时什么原因?
The program didn't contain a Flink job. Perhaps you forgot to call execute() on
the execution environment.
使用的是perjob模式提交作业,没有使用yarn-seesion。为什么perjob模式提交有这个-yd参数会有问题,还是没太懂。
在 2020-04-15 08:52:11,"tison" 写道:
>-yd 参数影响的是你是否使用 perjob 模式提交作业,简单地说
>
>with -yd 以 perjob 模式提交作业,即启动一个新集群
>without -yd 提交到一个现有的 Flink on YARN 集群
>
>哪个是你的需求呢?有没有实现用 yarn-session 启动 Flink on YARN 集群呢?
提交失败,本人测试与-yd参数有关系,这个参数去掉就可以提交了。但是不知道 -yd这个参数影响了什么?
At 2020-04-14 15:31:00, "guanyq" wrote:
>提交失败,yarn资源也还有很多,为什么会提交失败呢?
>
>提交脚本
>./bin/flink run -m yarn-cluster \
>-ynm TestDataProcess \
>-yd \
>-yn 2 \
>-ytm 1024 \
>-yjm 1024 \
>-c com.data.processing
提交失败,yarn资源也还有很多,为什么会提交失败呢?
提交脚本
./bin/flink run -m yarn-cluster \
-ynm TestDataProcess \
-yd \
-yn 2 \
-ytm 1024 \
-yjm 1024 \
-c com.data.processing.unconditionalacceptance.TestDataProcess \
./tasks/UnconditionalAcceptanceDataProcess.jar \
yarn资源
Apps Submitted Apps PendingApps Running
脚本设置-ytm 666但是flink ui页面的,job manager--taskmanager.heap.size为1024
在 2020-04-14 14:10:31,"Xintong Song" 写道:
>启动命令看起来是对的。
>你说的不起作用,具体是什么现象呢?
>
>Thank you~
>
>Xintong Song
>
>
>
>On Tue, Apr 14, 2020 at 2:05 PM guanyq wrote:
>
>> ./bin/flink ru
的图片没有显示出来。
>建议把完整的启动命令贴一下。
>
>Thank you~
>
>Xintong Song
>
>
>
>On Tue, Apr 14, 2020 at 1:11 PM guanyq wrote:
>
>> flink 提交jar包是 指定-ytm不起作用。想知道什么原因?
>>
>>
>>
>>
flink 提交jar包是 指定-ytm不起作用。想知道什么原因?
请教个问题
ytm参数不生效,什么原因呀?
您好:
1.随着程序的运行,task内存中的状态会不断增加,迟早会出现内存溢出问题,想知道一般都如何解决这个问题?
您好:
Run a single Flink job on YARN模式下,
flink生产日志一般如何配置,及使用才能监控到任务运行是的异常和异常日志的。
辛苦了!
flink的一些方法或者类都被@deprecated修饰
1.如何找到相对应建议使用的方法或者类,来替换@deprecated修饰的类或方法?
2.如何知道为什么这些@deprecated修饰的类或方法被弃用呢?
package com.guanyq.study.libraries.stateProcessorApi.FsStateBackend;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import
74 matches
Mail list logo